"""
Parse syslog lines in text format.
"""
import datetime, time
import re
import logging; log = logging.getLogger('pvl.syslog.parser')
class SyslogParser (object) :
"""
A source of syslog items.
"""
# default syslogd format
SYSLOG_RE = re.compile(
# the timestamp+hostname header
r"(?P<timestamp>\w{3} [0-9 ]\d \d{2}:\d{2}:\d{2}) (?P<hostname>\S+) "
# the message, including possible tag/pid
+ r"(?P<message>(?P<tag>(?P<program>[^:\]]+)(?:\[(?P<pid>\d+)\])?: )?(?P<text>.*))\n?"
)
TIMESTAMP_FMT = '%b %d %H:%M:%S'
def __init__ (self, raw=False, prog=None) :
"""
Using given underlying line source.
"""
self.raw = raw
self.prog = prog
def parse_timestamp (self, match) :
"""
Parse timstamp from line into datetime.
"""
timestamp = match.group('timestamp')
# add missing year; assume current
timestamp = time.strftime('%Y') + ' ' + timestamp
# k
timestamp = datetime.datetime.strptime(timestamp, '%Y ' + self.TIMESTAMP_FMT)
return timestamp
def parse_prog (self, match) :
"""
Parse prog from line.
"""
prog = match.group('program')
if not prog :
# no tag
return None
# normalize
prog = prog.lower()
if prog.startswith('/') :
# base
prog = prog.split('/')[-1]
return prog
def parse (self, line) :
"""
Parse given input line into SyslogMessage.
"""
# ignore whitespace
line = line.strip()
# debug
log.debug("%s", line)
# timestamp?
if self.raw :
# from defaults
return dict(
timestamp = datetime.datetime.now(), # XXX: None?
host = None,
prog = self.prog,
pid = None,
msg = line,
)
else :
# parse
match = self.SYSLOG_RE.match(line)
if not match :
log.warn("Unparseable syslog message: %r", line)
return
# parse
return dict(
timestamp = self.parse_timestamp(match),
host = match.group('hostname'),
prog = self.parse_prog(match),
pid = match.group('pid'),
msg = match.group('text'),
)
def match_prog (self, prog) :
"""
Match given prog?
"""
if not prog :
# never matches non-tagged lines
return False
elif self.prog.endswith('*') :
# prefix match
return prog.startswith(self.prog[:-1])
else :
return prog == self.prog
def filter (self, line, item) :
"""
Filter given item?
"""
if not item :
log.debug("empty: %r", line)
elif self.prog and not self.match_prog(item['prog']) :
log.debug("prog: %r", item)
else :
# ok
return True
def process (self, lines) :
"""
Yield SyslogMessages from given series of lines.
"""
for line in lines :
item = self.parse(line)
# filter?
if self.filter(line, item) :
yield item