|
1 """ |
|
2 Full-text searching of logs |
|
3 """ |
|
4 |
|
5 import datetime, calendar, pytz |
|
6 |
|
7 import HyperEstraier as hype |
|
8 |
|
9 import log_line |
|
10 |
|
11 class LogSearchIndex (object) : |
|
12 """ |
|
13 An index on the logs for a group of channels. |
|
14 |
|
15 This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). |
|
16 |
|
17 These log documents have the following attributes: |
|
18 @uri - channel/date/line |
|
19 @channel - channel id |
|
20 @type - the LogType id |
|
21 @timestamp - UTC timestamp |
|
22 @source - nickname |
|
23 |
|
24 Each document then has a single line of data, which is the log message itself |
|
25 """ |
|
26 |
|
27 def __init__ (self, path, mode='r') : |
|
28 """ |
|
29 Open the database, with the given mode: |
|
30 r - read-only |
|
31 w - read-write, create if not exists |
|
32 a - read-write, do not create |
|
33 * - read-write, truncate and create new |
|
34 """ |
|
35 |
|
36 # mapping of { mode -> flags } |
|
37 mode_to_flag = { |
|
38 'r': hype.Database.DBREADER, |
|
39 'w': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT, |
|
40 'a': hype.Database.DBREADER | hype.Database.DBWRITER, |
|
41 '*': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC, |
|
42 } |
|
43 |
|
44 # look up flags |
|
45 flags = mode_to_flag[mode] |
|
46 |
|
47 # make instance |
|
48 self.db = hype.Database() |
|
49 |
|
50 # open |
|
51 if not self.db.open(path, flags) : |
|
52 raise Exception("Index open failed: %s" % (path, )) |
|
53 |
|
54 def insert (self, channel, lines) : |
|
55 """ |
|
56 Adds a sequence of LogLines from the given LogChannel to the index |
|
57 """ |
|
58 |
|
59 # validate the LogChannel |
|
60 assert channel.name |
|
61 |
|
62 # iterate |
|
63 for line in lines : |
|
64 # validate the LogLine |
|
65 assert line.offset |
|
66 assert line.timestamp |
|
67 |
|
68 # create new document |
|
69 doc = hype.Document() |
|
70 |
|
71 # line date |
|
72 date = line.timestamp.date() |
|
73 |
|
74 # convert to UTC timestamp |
|
75 utc_timestamp = calendar.timegm(line.timestamp.utctimetuple()) |
|
76 |
|
77 # ensure that it's not 1900 |
|
78 assert date.year != 1900 |
|
79 |
|
80 # add URI |
|
81 doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) |
|
82 |
|
83 # add channel id |
|
84 doc.add_attr('@channel', channel.id) |
|
85 |
|
86 # add type |
|
87 doc.add_attr('@type', str(line.type)) |
|
88 |
|
89 # add UTC timestamp |
|
90 doc.add_attr('@timestamp', str(utc_timestamp)) |
|
91 |
|
92 # add source attribute? |
|
93 if line.source : |
|
94 doc.add_attr('@source', str(line.source)) |
|
95 |
|
96 # add data text |
|
97 doc.add_text(line.data.encode('utf8')) |
|
98 |
|
99 # put |
|
100 # XXX: what does this flag mean? |
|
101 if not self.db.put_doc(doc, hype.Database.PDCLEAN) : |
|
102 raise Exeception("Index put_doc failed") |
|
103 |
|
104 def search_cond (self, cond) : |
|
105 """ |
|
106 Search using a raw hype.Condition |
|
107 """ |
|
108 |
|
109 # execute search, unused 'flags' arg stays zero |
|
110 results = self.db.search(cond, 0) |
|
111 |
|
112 # iterate over the document IDs |
|
113 for doc_id in results : |
|
114 # load document, this throws an exception... |
|
115 # option constants are hype.Database.GDNOATTR/GDNOTEXT |
|
116 doc = self.db.get_doc(doc_id, 0) |
|
117 |
|
118 # load the attributes/text |
|
119 channel_id = doc.attr('@channel') |
|
120 type = int(doc.attr('@type')) |
|
121 timestamp = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc) |
|
122 source = doc.attr('@source') |
|
123 data = doc.cat_texts() |
|
124 |
|
125 # build+yield to (channel_id, LogLine) tuple |
|
126 yield (channel_id, log_line.LogLine(None, type, timestamp, source, data)) |
|
127 |
|
128 def search_simple (self, channel, query) : |
|
129 """ |
|
130 Search for lines from the given channel for the given simple query |
|
131 """ |
|
132 |
|
133 # build condition |
|
134 cond = hype.Condition() |
|
135 |
|
136 # simplified phrase |
|
137 cond.set_options(hype.Condition.SIMPLE) |
|
138 |
|
139 # add channel attribute |
|
140 cond.add_attr("@channel STREQ %s" % (channel.id, )) |
|
141 |
|
142 # add phrase |
|
143 cond.set_phrase(query) |
|
144 |
|
145 # set order |
|
146 cond.set_order("@timestamp NUMA") |
|
147 |
|
148 # search with cond |
|
149 for channel_id, line in self.search_cond(cond) : |
|
150 assert channel_id == channel.id |
|
151 |
|
152 yield line |
|
153 |
|
154 def cmd_load (options, channel_name, date) : |
|
155 """ |
|
156 Loads the logs for a specific channel/date into the index |
|
157 """ |
|
158 |
|
159 import channels |
|
160 |
|
161 # open the LogSearchIndex |
|
162 index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a') |
|
163 |
|
164 # open the channel |
|
165 channel = channels.channel_list.lookup(channel_name) |
|
166 |
|
167 # parse date |
|
168 date = datetime.datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=channel.source.tz) |
|
169 |
|
170 # load lines for date |
|
171 lines = channel.source.get_date(date) |
|
172 |
|
173 # insert |
|
174 index.insert(channel, lines) |
|
175 |
|
176 def cmd_search (options, channel_name, query) : |
|
177 """ |
|
178 Search the index for events on a specific channel with the given query |
|
179 """ |
|
180 |
|
181 import channels |
|
182 |
|
183 # open the LogSearchIndex |
|
184 index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a') |
|
185 |
|
186 # open the channel |
|
187 channel = channels.channel_list.lookup(channel_name) |
|
188 |
|
189 # search |
|
190 lines = index.search_simple(channel, query) |
|
191 |
|
192 # display as plaintext |
|
193 for line in options.formatter.format_txt(lines) : |
|
194 print line |
|
195 |
|
196 if __name__ == '__main__' : |
|
197 from optparse import OptionParser |
|
198 import log_formatter |
|
199 |
|
200 # define parser |
|
201 parser = OptionParser( |
|
202 usage = "%prog [options] <command> [ ... ]", |
|
203 add_help_option = True, |
|
204 ) |
|
205 |
|
206 # define command-line arguments |
|
207 parser.add_option("-I", "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index") |
|
208 parser.add_option("--create", dest="create_index", help="Create index database", default=False) |
|
209 parser.add_option("-f", "--formatter", dest="formatter_name", help="LogFormatter to use", default="irssi") |
|
210 parser.add_option("-z", "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC") |
|
211 |
|
212 # parse |
|
213 options, args = parser.parse_args() |
|
214 |
|
215 # postprocess stuff |
|
216 options.tz = pytz.timezone(options.tz_name) |
|
217 options.formatter = log_formatter.by_name(options.formatter_name)(options.tz) |
|
218 |
|
219 # pop command |
|
220 command = args.pop(0) |
|
221 |
|
222 # inspect |
|
223 func = globals()['cmd_%s' % command] |
|
224 |
|
225 # call |
|
226 func(options, *args) |