--- a/.hgignore Tue Feb 19 19:28:40 2013 +0200
+++ b/.hgignore Tue Feb 19 21:23:40 2013 +0200
@@ -7,7 +7,6 @@
# generated files
^dist/
^MANIFEST$
-^twisted/plugins/dropin.cache$
^log
# testing
--- a/bin/pvl.irk Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,76 +0,0 @@
-#!/usr/bin/env python
-
-"""
- Test Irk
-"""
-
-__version__ = '0.1'
-
-import pvl.args
-import pvl.irk
-
-import sys, pvl.syslog.tail # XXX: for sys.stdin
-
-import logging, optparse
-
-log = logging.getLogger('main')
-
-def parse_options (argv) :
- """
- Parse command-line arguments.
- """
-
- prog = argv[0]
-
- parser = optparse.OptionParser(
- prog = prog,
- usage = '%prog: [options]',
- version = __version__,
-
- # module docstring
- description = __doc__,
- )
-
- # options
- parser.add_option_group(pvl.args.parser(parser))
- parser.add_option_group(pvl.irk.parser(parser))
-
- parser.add_option('--join', action='store_true', help="Join given targets")
- parser.add_option('--part', action='store_true', help="Part given targets")
-
- # parse
- options, args = parser.parse_args(argv[1:])
-
- # apply
- pvl.args.apply(options, prog)
-
- return options, args
-
-def main (argv) :
- options, args = parse_options(argv)
-
- log.info("Connect IRK..")
- irker = pvl.irk.apply(options)
-
- log.info("Load targets...")
- targets = [irker.target(target, join=options.join) for target in args]
-
- log.info("Send messages...")
- for line in pvl.syslog.file.File(sys.stdin) :
- log.info("%s", line)
-
- for target in targets :
- target(line)
-
- if options.part :
- for target in targets :
- target.part()
-
- # done
- log.info("Exiting...")
- return 0
-
-if __name__ == '__main__':
- import sys
-
- sys.exit(main(sys.argv))
--- a/bin/pvl.irker Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,7 +0,0 @@
-#!/usr/bin/python
-
-from pvl.irker.irker import main
-
-if __name__ == '__main__' :
- import sys
- sys.exit(main(sys.argv[1:]))
--- a/bin/pvl.irker-syslog Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-#!/usr/bin/env python
-
-"""
- Syslog -> Irk
-"""
-
-__version__ = '0.0'
-
-import pvl.args
-import pvl.syslog.args
-import pvl.syslog.rule
-import pvl.irk
-
-import logging, optparse
-
-log = logging.getLogger('main')
-
-def parse_options (argv) :
- """
- Parse command-line arguments.
- """
-
- prog = argv[0]
-
- parser = optparse.OptionParser(
- prog = prog,
- usage = '%prog: [options]',
- version = __version__,
-
- # module docstring
- description = __doc__,
- )
-
- # options
- parser.add_option_group(pvl.args.parser(parser))
-
- # input
- parser.add_option_group(pvl.syslog.args.parser(parser))
- parser.add_option_group(pvl.syslog.rule.parser(parser))
- parser.add_option_group(pvl.irk.parser(parser, connect=None))
-
- parser.add_option('--irker-target', metavar='IRC',
- help="Irker target URL")
-
- # parse
- options, args = parser.parse_args(argv[1:])
-
- # apply
- pvl.args.apply(options, prog, rootok=False)
-
- return options, args
-
-def apply_irker (irker) :
- """
- Handle irker activity.
- """
-
- for msg in irker.irk :
- log.info("irk: %s", msg)
-
-def apply_syslog (options, syslog, rules, irker) :
- """
- Handle syslog activity.
- """
-
- # syslogs
- for item in syslog :
- match, rulepath, apply = rules.apply(item)
-
- log.debug("%s: %s: %s", item, rulepath, apply)
-
- target = apply.get('irk', options.irker_target)
-
- tag = '/'.join(str(rule) for rule in reversed(rulepath[:-1]))
- text = apply.get('text')
-
- log.info("%s: %s: %s", target, tag, text)
-
- if not text :
- # XXX: plain irk = ... in rule is broken, as it always applies, and skips any further rules
- continue
-
- if irker and target :
- irker[target]('[' + tag + '] ' + text)
- else :
- print tag, text
-
-def close_irker (irker) :
- """
- Shutdown irker before quitting.
-
- XXX: irker.close() to disconnect?
- """
-
- log.info("Shutting down IRK...")
-
- for target in list(irker) :
- log.warn("%s", target)
- del irker[target]
-
-def close_syslog (syslog) :
- """
- Shutdown syslog before quitting
- """
-
- # XXX: do all sources support close()?
- log.warn("%s", syslog)
- syslog.close()
-
-def main (argv) :
- options, args = parse_options(argv)
-
- # no args
- if args :
- log.error("Usage: pvl.irker-syslog [options]")
- return 2
-
- # setup
- log.info("Open syslog...")
- syslog = pvl.syslog.args.apply(options)
-
- log.info("Load rules...")
- rules = pvl.syslog.rule.apply(options)
-
- log.info("Connect IRK..")
- irker = pvl.irk.apply(options)
-
- if options.irker_target :
- # pre-join target
- irker[options.irker_target]
-
- log.info("Process syslog messages...")
-
- # customized mainloop that supports irker.irk
- while True :
- try :
- # TODO: seprate IrkError, to not confuse irk write vs syslog read eof
- apply_syslog(options, syslog, rules, irker)
-
- except EOFError as ex :
- log.error("syslog: EOF")
-
- close_irker(irker)
-
- # 0 is controlled exit
- return 0
-
- except pvl.irk.IrkError as ex :
- log.error("irker: %s", ex)
-
- # XXX: copy-pasta
- close_syslog(syslog)
- return 0
-
- # quit unless we have something to poll
- if not syslog.poll :
- break
-
- # is irk pollable?
- if irker.irk.recv :
- reading = (irker.irk, )
- else :
- reading = ()
-
- poll = syslog.select(syslog.poll, reading=reading) or () # timeout -> ()
-
- if irker.irk in poll :
- # irks?
- try :
- apply_irker(irker)
-
- except EOFError :
- log.error("irk: EOF")
-
- close_syslog(syslog)
-
- # exit 0, so as to restart sooner
- # XXX: maybe use a special exit code instead?
- return 0
-
- # done
- log.info("Exiting...")
- return 0
-
-if __name__ == '__main__':
- import sys
-
- sys.exit(main(sys.argv))
--- a/etc/default/pvl-irker Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,6 +0,0 @@
-# configuration for /etc/init.d/pvl-irker
-
-#DAEMON="/usr/bin/twistd"
-#PIDFILE="/var/run/pvl-irker/pvl-irker.pid"
-#DAEMON_ARGS="--uid=pvl-irker --gid=pvl-irker --rundir=/var/run/pvl-irker --logfile=/var/log/pvl-irker.log"
-TWISTD_ARGS="--listen 6659"
--- a/etc/init.d/pvl-irker Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,164 +0,0 @@
-#! /bin/sh
-### BEGIN INIT INFO
-# Provides: irker
-# Required-Start: $remote_fs $network
-# Required-Stop: $remote_fs $network
-# Default-Start: 2 3 4 5
-# Default-Stop: 0 1 6
-# Short-Description: pvl-irker
-# Description: irk -> IRC gateway
-### END INIT INFO
-
-# Author: Tero Marttila <terom@paivola.fi>
-
-# Do NOT "set -e"
-
-# PATH should only include /usr/* if it runs after the mountnfs.sh script
-PATH=/sbin:/usr/sbin:/bin:/usr/bin
-
-NAME=pvl-irker
-DESC=$NAME
-SCRIPTNAME=/etc/init.d/$NAME
-
-DAEMON_NAME=twistd
-DAEMON=/usr/bin/$DAEMON_NAME
-
-TWISTD=pvl-irker
-TWISTD_ARGS=
-
-PIDFILE=/var/run/$NAME.pid
-
-DAEMON_ARGS=
-
-# Exit if the package is not installed
-[ -x "$DAEMON" ] || exit 0
-
-# Read configuration variable file if it is present
-[ -r /etc/default/$NAME ] && . /etc/default/$NAME
-
-# Load the VERBOSE setting and other rcS variables
-. /lib/init/vars.sh
-
-# Define LSB log_* functions.
-# Depend on lsb-base (>= 3.2-14) to ensure that this file is present
-# and status_of_proc is working.
-. /lib/lsb/init-functions
-
-#
-# Function that starts the daemon/service
-#
-do_start()
-{
- # Return
- # 0 if daemon has been started
- # 1 if daemon was already running
- # 2 if daemon could not be started
- start-stop-daemon --start --quiet --pidfile $PIDFILE --exec $DAEMON --test > /dev/null \
- || return 1
- start-stop-daemon --start --quiet --pidfile $PIDFILE --exec $DAEMON -- \
- --pidfile=$PIDFILE $DAEMON_ARGS $TWISTD $TWISTD_ARGS \
- || return 2
-
- # Add code here, if necessary, that waits for the process to be ready
- # to handle requests from services started subsequently which depend
- # on this one. As a last resort, sleep for some time.
-}
-
-#
-# Function that stops the daemon/service
-#
-do_stop()
-{
- # Return
- # 0 if daemon has been stopped
- # 1 if daemon was already stopped
- # 2 if daemon could not be stopped
- # other if a failure occurred
- start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE --name $DAEMON_NAME
- RETVAL="$?"
- [ "$RETVAL" = 2 ] && return 2
- # Wait for children to finish too if this is a daemon that forks
- # and if the daemon is only ever run from this initscript.
- # If the above conditions are not satisfied then add some other code
- # that waits for the process to drop all resources that could be
- # needed by services started subsequently. A last resort is to
- # sleep for some time.
- start-stop-daemon --stop --quiet --oknodo --retry=0/30/KILL/5 --exec $DAEMON
- [ "$?" = 2 ] && return 2
- # Many daemons don't delete their pidfiles when they exit.
- rm -f $PIDFILE
- return "$RETVAL"
-}
-
-#
-# Function that sends a SIGHUP to the daemon/service
-#
-do_reload() {
- #
- # If the daemon can reload its configuration without
- # restarting (for example, when it is sent a SIGHUP),
- # then implement that here.
- #
- start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE --name $DAEMON_NAME
- return 0
-}
-
-case "$1" in
- start)
- [ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME"
- do_start
- case "$?" in
- 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
- 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
- esac
- ;;
- stop)
- [ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME"
- do_stop
- case "$?" in
- 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
- 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
- esac
- ;;
- status)
- status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $?
- ;;
- #reload|force-reload)
- #
- # If do_reload() is not implemented then leave this commented out
- # and leave 'force-reload' as an alias for 'restart'.
- #
- #log_daemon_msg "Reloading $DESC" "$NAME"
- #do_reload
- #log_end_msg $?
- #;;
- restart|force-reload)
- #
- # If the "reload" option is implemented then remove the
- # 'force-reload' alias
- #
- log_daemon_msg "Restarting $DESC" "$NAME"
- do_stop
- case "$?" in
- 0|1)
- do_start
- case "$?" in
- 0) log_end_msg 0 ;;
- 1) log_end_msg 1 ;; # Old process is still running
- *) log_end_msg 1 ;; # Failed to start
- esac
- ;;
- *)
- # Failed to stop
- log_end_msg 1
- ;;
- esac
- ;;
- *)
- #echo "Usage: $SCRIPTNAME {start|stop|restart|reload|force-reload}" >&2
- echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2
- exit 3
- ;;
-esac
-
-:
--- a/etc/syslog.conf.dist Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,74 +0,0 @@
-#irk = irc://syslog@irc-test/test
-
-# TODO: implements meta-attrs across rule tree to classify hosts?
-#[tag]
-# [[puppetmaster]]
-# host = guru
-#
-# [[auth-high]]
-# host = guru
-
-# auth on normal hosts
-[auth]
- facility = auth*
-
- [[pam]]
- pattern = (?P<pam>pam_\w+)\((?P<pam_service>.+?):(?P<pam_type>.+?)\): (?P<msg>.+)
-
- # at least debian wheezy's pam_unix syslogs session open/close at LOG_INFO
- [[[pam-sudo]]]
- pam_service = sudo
- severity = info
- format = # ignore
-
- [[sudo]]
- program = sudo
- pattern = (?P<login>\S+) : TTY=(?P<tty>\S+) ; PWD=(?P<pwd>.+?) ; USER=(?P<user>\S+) ; (?:ENV=(?P<env>.+?) ; )?COMMAND=(?P<command>.*)
- format = {login}:{tty} - {user}@{host}:{pwd} - {command!r}
-
- # ignore puppet readshadow on puppetmasters
- [[[puppet_readshadow]]]
- login = puppet
- user = root
- command = /usr/bin/getent shadow \w+
- format = # ignore
-
- [[[env]]]
- env = .+
- format = {login}:{tty} - {user}@{host}:{pwd} - {env}{command!r}
-
- [[sudo-unknown]]
- program = sudo
- format = {host} {msg}
-
-# auth on high-sec hosts
-[auth-high]
- host = .+
- facility = auth*
-
- # TODO: pubkey, failures?
- [[ssh]]
- program = sshd
- pattern = Accepted (?P<auth>.+?) for (?P<user>\S+) from (?P<ip>\S+) port (?P<port>\S+) (?P<proto>\S+)
- format = SSH {auth} login for {user}@{host} from {ip}
-
- [[cron]]
- program = cron
- format = # ignore
-
- [[su_nobody]]
- program = su
- pattern = Successful su for nobody by root|\+ \?\?\? root:nobody
- format = # ignore
-
- [[all]]
- format = {host} {msg}
-
-# user
-[user]
- facility = user
-
- [[puppet]]
- program = puppet
- format = {host} {msg}
-
--- a/pvl/args.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,115 +0,0 @@
-"""
- CLI argument handling; common stuff: logging
-"""
-
-import optparse
-import logging
-
-import pwd, grp, os, sys
-
-import logging; log = logging.getLogger('pvl.args')
-
-def parser (parser) :
- """
- Return an optparse.OptionGroup.
- """
-
- general = optparse.OptionGroup(parser, "General options")
-
- general.add_option('-q', '--quiet', dest='loglevel', action='store_const', const=logging.ERROR, help="Less output")
- general.add_option('-v', '--verbose', dest='loglevel', action='store_const', const=logging.INFO, help="More output")
- general.add_option('-D', '--debug', dest='loglevel', action='store_const', const=logging.DEBUG, help="Even more output")
- general.add_option('--log-file', help="Log to file")
- general.add_option('--debug-module', action='append', metavar='MODULE',
- help="Enable logging for the given logger/module name")
-
- general.add_option('--uid', help="Change uid")
- general.add_option('--gid', help="Change gid")
-
- # defaults
- parser.set_defaults(
- logname = parser.prog,
- loglevel = logging.WARN,
- debug_module = [],
- )
-
- return general
-
-def options (**options) :
- """
- Synthensise options.
- """
-
- return optparse.Values(options)
-
-def apply_setid (options, rootok=None) :
- """
- Drop privileges if running as root.
-
- XXX: this feature isn't very useful (import-time issues etc), but in certain cases (syslog-ng -> python),
- it's difficult to avoid this without some extra wrapper tool..?
- """
-
- # --uid -> pw
- if not options.uid :
- pw = None
- elif options.uid.isdigit() :
- pw = pwd.getpwuid(int(options.uid))
- else :
- pw = pwd.getpwnam(options.uid)
-
- # --gid -> gr
- if not options.gid and not pw :
- gr = None
- elif not options.gid :
- gr = grp.getgrgid(pw.pw_gid)
- elif options.gid.isdigit() :
- gr = grp.getgrgid(str(options.gid))
- else :
- gr = grp.getgrnam(options.gid)
-
- if gr :
- # XXX: secondary groups? seem to get cleared
- log.info("setgid: %s: %s", gr.gr_name, gr.gr_gid)
- os.setgid(gr.gr_gid)
-
- if pw :
- log.info("setuid: %s: %s", pw.pw_name, pw.pw_uid)
- os.setuid(pw.pw_uid)
-
- elif os.getuid() == 0 :
- if rootok :
- log.info("running as root")
- else :
- log.error("refusing to run as root, use --uid 0 to override")
- sys.exit(2)
-
-def apply (options, logname=None, rootok=True) :
- """
- Apply the optparse options.
- """
-
- if logname :
- prefix = options.logname + ': '
- else :
- prefix = ''
-
- # configure
- logging.basicConfig(
- # XXX: log Class.__init__ as Class, not __init__?
- format = prefix + '%(name)-20s: %(levelname)5s %(funcName)s: %(message)s',
- level = options.loglevel,
- filename = options.log_file,
- )
-
- # TODO: use --quiet for stdout output?
- options.quiet = options.loglevel > logging.WARN
-
- if options.uid or options.gid or not rootok :
- # set uid/gid
- apply_setid(options, rootok=rootok)
-
- # enable debugging for specific targets
- for logger in options.debug_module :
- logging.getLogger(logger).setLevel(logging.DEBUG)
-
--- a/pvl/invoke.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,184 +0,0 @@
-"""
- Invoke external commands, with python kwargs -> options mangling.
-"""
-
-import subprocess
-import logging
-
-log = logging.getLogger('pvl.invoke')
-
-class InvokeError (Exception) :
- def __init__ (self, cmd, exit, error) :
- self.cmd = cmd
- self.exit = exit
- self.error = error
-
- def __str__ (self) :
- return "{self.cmd} failed ({self.exit}): {self.error}".format(self=self)
-
-def invoke (cmd, args, stdin=None) :
- """
- Invoke a command directly.
-
- stdin:
- False -> passthrough stdin/stdout
- None -> return lines of stdout
- [lines] -> write lines on stdin, return lines of stdout
-
- Raises InvokeError on nonzero exit, otherwise log.warn's any stderr.
- """
-
- log.debug("{cmd} {args}".format(cmd=cmd, args=' '.join(args)))
-
- if stdin is False :
- # keep process stdin/out
- io = None
- input = None
-
- elif stdin :
- # return stdout, give stdin
- io = subprocess.PIPE
- input = '\n'.join(stdin) + '\n'
-
- else :
- # return stdout
- io = subprocess.PIPE
- input = None
-
- p = subprocess.Popen([cmd] + args, stdin=io, stdout=io, stderr=io)
-
- # get output
- # returns None if not io
- stdout, stderr = p.communicate(input=input)
-
- if p.returncode :
- # failed
- raise InvokeError(cmd, p.returncode, stderr)
-
- elif stderr :
- log.warning("%s: %s", cmd, stderr)
-
- if stdout :
- return stdout.splitlines()
- else :
- return None
-
-import collections
-
-def process_opt (name, value) :
- """
- Mangle from python keyword-argument dict format to command-line option tuple format.
-
- >>> process_opt('foo', True)
- ('--foo',)
- >>> process_opt('foo', 2)
- ('--foo', '2')
- >>> process_opt('foo', 'bar')
- ('--foo', 'bar')
- >>> process_opt('foo_bar', 'asdf')
- ('--foo-bar', 'asdf')
-
- # multi
- >>> process_opt('foo', ['bar', 'quux'])
- ('--foo', 'bar', '--foo', 'quux')
- >>> process_opt('foo', [False, 'bar', True])
- ('--foo', 'bar', '--foo')
-
- # empty
- >>> process_opt('foo', False)
- ()
- >>> process_opt('foo', None)
- ()
- >>> process_opt('bar', '')
- ()
-
- Returns a tuple of argv items.
- """
-
- # mangle opt
- opt = '--' + name.replace('_', '-')
-
- if value is True :
- # flag opt
- return (opt, )
-
- elif not value :
- # flag opt / omit
- return ( )
-
- elif isinstance(value, basestring) :
- return (opt, value)
-
- elif isinstance(value, collections.Iterable) :
- opts = (process_opt(name, subvalue) for subvalue in value)
-
- # flatten
- return tuple(part for parts in opts for part in parts)
-
- else :
- # as-is
- return (opt, str(value))
-
-def optargs (*args, **kwargs) :
- """
- Convert args/options into command-line format
-
- >>> optargs('foo')
- ['foo']
- >>> optargs(foo=True)
- ['--foo']
- >>> optargs(foo=False)
- []
- >>> optargs(foo='bar')
- ['--foo', 'bar']
- """
-
- ## opts
- # process
- opts = [process_opt(opt, value) for opt, value in kwargs.iteritems()]
-
- # flatten
- opts = [str(part) for parts in opts for part in parts]
-
- ## args
- args = [str(arg) for arg in args if arg]
-
- return opts + args
-
-# XXX: move to pvl.utils or something random?
-def merge (*dicts, **kwargs) :
- """
- Merge given dicts together.
-
- >>> merge(foo=1, bar=2)
- {'foo': 1, 'bar': 2}
- >>> merge(dict(foo=1), bar=2)
- {'foo': 1, 'bar': 2}
- >>> merge(dict(foo=1), bar=2, foo=3)
- {'foo': 3, 'bar': 2}
- >>> merge(dict(foo=1), dict(bar=2), foo=3)
- {'foo': 3, 'bar': 2}
- >>> merge(dict(bar=2), dict(foo=1), foo=3)
- {'foo': 3, 'bar': 2}
-
- """
-
- return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
-
-
-def command (cmd, *args, **opts) :
- """
- Invoke a command with options/arguments, given via Python arguments/keyword arguments.
-
- Return stdout.
- """
-
- log.debug("{cmd} {opts} {args}".format(cmd=cmd, args=args, opts=opts))
-
- # invoke
- return invoke(cmd, optargs(*args, **opts))
-
-if __name__ == '__main__':
- import doctest
- doctest.testmod()
-
--- a/pvl/irk.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,240 +0,0 @@
-"""
- Irker client.
-"""
-
-import pvl.syslog.file # for stdin
-import pvl.socket # for tcp
-
-import optparse, sys
-
-import logging; log = logging.getLogger('pvl.irk')
-
-import json
-
-def parser (parser, connect='tcp://localhost/', target=None) :
- """
- Optparse option group.
- """
-
- irker = optparse.OptionGroup(parser, 'Irker output')
-
- irker.add_option('--irker', metavar='URL', default=connect,
- help="Irker daemon URL")
-
- irker.add_option('--irker-notice', action='store_true',
- help="Use irker NOTICE")
-
- irker.add_option('--irker-part', action='store_true',
- help="Use irker PART")
-
- return irker
-
-def apply (options) :
- """
- Return Irker (XXX: target) from options.
- """
-
- # None -> stdout
- return Irker(options.irker, options) # options.irker_*
-
-class IrkError (Exception) :
- """
- Irk write error.
- """
-
-class Irk (object) :
- """
- Irker JSON connection speaks JSON over a stream.
-
- TODO: timeouts?
- """
-
- PORT = 6659
-
- @classmethod
- def connect (cls, url) :
- """
- Connect to given URL string, or None -> stdout
- """
-
- if not url :
- # no read
- return cls(pvl.syslog.file.File(sys.stdout), recv=False)
-
- else :
- sock = pvl.socket.connect(url, port=cls.PORT)
-
- # just to make things a bit more exciting... and we really don't want to be blocking on our output..
- sock.setblocking(False)
-
- return cls(
- pvl.socket.WriteStream(sock, buffer=None),
- pvl.socket.ReadStream(sock)
- )
-
- def __init__ (self, send, recv=None) :
- """
- Use given file-like object (write, flush, fileno) for output.
- """
-
- self.send = send
- self.recv = recv
-
- log.debug("%s <-> %s", send, recv)
-
- def fileno (self) :
- """
- Return fd. Useful for detecting error conditions (connection lost).
-
- Only valid if self.recv is True.
- """
-
- return self.recv.fileno()
-
- def __call__ (self, **opts) :
- """
- Send given json.
-
- Raises IrkError on write EOF.
-
- XXX: Raises socket.error/IOError on write errors?
- """
-
- log.debug("%s", opts)
-
- try :
- # write line + flush
- self.send(json.dumps(opts))
-
- except EOFError as ex :
- # XXX: also socket.error etc?
- raise IrkError("%s: send eof: %s" % (self, ex))
-
- # XXX: self.send.flush()
-
- def __iter__ (self) :
- """
- Yield JSON inputs from source.
- """
-
- if not self.recv :
- # never going to be anything
- return
-
- for line in self.recv :
- # XXX: error handling?
- yield json.loads(line)
-
-class IrkerTarget (object) :
- """
- A channel on an Irk connection.
-
- Raises IrkError if irk(..) fails.
- """
-
- def __init__ (self, irker, target, notice=None, part=None) :
- self.irker = irker
- self.target = target
-
- self._notice = notice
- self._part = part
-
- def join (self) :
- log.info("%s", self)
- self.irker(to=str(self), privmsg='')
-
- def privmsg (self, *args) :
- for arg in args :
- log.info("%s: %s", self, arg)
- self.irker(to=str(self), privmsg=arg)
-
- def notice (self, *args) :
- for arg in args :
- log.info("%s: %s", self, arg)
- self.irker(to=str(self), notice=arg)
-
- def part (self, msg='') :
- log.info("%s: %s", self, msg)
-
- if self._part :
- self.irker(to=str(self), part=msg)
- else :
- log.warn("%s: no --irker-part", self)
-
- def __call__ (self, *args) :
- # default msg policy
- if self._notice :
- return self.notice(*args)
- else :
- return self.privmsg(*args)
-
- def __str__ (self) :
- return self.target
-
-class Irker (object) :
- """
- Reconnecting Irk.
- """
-
- def __init__ (self, url=None, options=None) :
- """
- url - irker to connect to
- options - irker_* configs
- """
-
- self.url = url
- self.targets = {}
- self.options = options
-
- self.connect()
-
- def connect (self) :
- """
- Connect, and fix up our targets.
- """
-
- self.irk = Irk.connect(self.url)
-
- # rejoin
- for target in self.targets.itervalues() :
- target.join()
-
- def __call__ (self, **opts) :
- """
- Send on current irker connection.
-
- Raises IrkError if irk(..) fails.
-
- TODO: handle errors and reconnect?
- """
-
- self.irk(**opts)
-
- def target (self, target, join=True) :
- """
- Bind to given target URL, returning an IrkerTarget for sending messages.
- """
-
- if target not in self.targets :
- self.targets[target] = IrkerTarget(self, target,
- notice = self.options and self.options.irker_notice,
- part = self.options and self.options.irker_part,
- )
-
- if join :
- self.targets[target].join()
-
- return self.targets[target]
-
- __getitem__ = target
-
- def __delitem__ (self, target) :
- """
- Unbind given target URL.
- """
-
- target = self.targets.pop(target)
- target.part()
-
- def __iter__ (self) :
- return iter(self.targets)
--- a/pvl/irker/__init__.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,5 +0,0 @@
-"""
- Twisted-based irker implementation.
-"""
-
-__version__ = '0.1dev'
--- a/pvl/irker/irc.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,462 +0,0 @@
-"""
- IRC client, dispatching irker messages.
-"""
-
-from twisted.internet import reactor, interfaces, protocol, defer, error
-from twisted.words.protocols import irc
-
-from twisted.internet import endpoints
-
-from twisted.python import log
-
-PORT = 6667
-
-def url2endpoint (reactor, url) :
- """
- Turn given urlparse URL into an endpoint.
-
- Raises KeyError on unknown scheme.
- """
-
- SCHEMES = {
- 'irc': lambda : endpoints.TCP4ClientEndpoint(reactor, url.hostname, url.port or PORT),
- }
-
- return SCHEMES[url.scheme]()
-
-def normalize (name) :
- """
- Normalize a channel/nickname for comparisons in IRC.
- """
-
- return name.lower()
-
-class IRCError (Exception) :
- """
- A handled protocol error.
- """
-
- pass
-
-class IRCChannel (object) :
- """
- A joined channel on an IRC server.
- """
-
- ENCODING = 'utf-8'
-
- def __init__ (self, client, channel, encoding=ENCODING) :
- self.client = client
- self.channel = channel
-
- self.encoding = encoding
-
- # TODO: separate join/part state
- #self.joining = self.parting = None
-
- def encode (self, unicode) :
- if unicode :
- return unicode.encode(self.encoding)
- else :
- return None
-
- def privmsg (self, *msgs) :
- for msg in msgs :
- # XXX: encode
- self.client.msg(self.channel, self.encode(msg))
-
- def notice (self, *msgs) :
- for msg in msgs :
- self.client.notice(self.channel, self.encode(msg))
-
- def part (self, msg=None) :
- """
- Remove channel from our list of channels.
- """
-
- # send the PART
- self.client.leave(self.channel, self.encode(msg))
-
- # ...and then immediately forget the channel, in case we rejoin before getting the part back
- # TODO: self.joining/parting
- self.client._close_channel(self.channel)
-
- def errback (self, failure) :
- """
- Fail any pending requests.
- """
-
- log.msg('IRCChannel.errback', self, failure)
-
- def __str__ (self) :
- return self.client.url(self.channel)
-
-class IRCClient (irc.IRCClient) :
- """
- A connection to an IRC server with a specific, requested nickname.
-
- Joins to channels.
- """
-
- performLogin = False
-
- def __init__ (self, factory) :
- self.factory = factory
-
- self.nickname = None
- self.hostname = None
-
- self._registering = None
- self._channels = { }
-
- # TODO: smarter/configurable queueing?
- self.lineRate = 1.0
-
- def connectionMade (self) :
- self.hostname = self.transport.getPeer().host
- self.transport.logPrefix = self.logPrefix
-
- log.msg("connectionMade", self, self.transport)
- irc.IRCClient.connectionMade(self)
-
- def sendLine (self, line) :
- irc.IRCClient.sendLine(self, line)
-
- log.msg(">>>", line)
-
- def lineReceived (self, line) :
- log.msg("<<<", line)
-
- irc.IRCClient.lineReceived(self, line)
-
- ## Register
- def register (self, nickname, username=None, password=None) :
- """
- Register to the server, choosing a nickname based on the given nickname.
-
- Returns a Deferred that callbacks with our actual nickname once we have registered, or errbacks with an IRCError.
- """
-
- if self._registering :
- raise Exception("register: already registering")
-
- self.username = username
- self.password = password
-
- log.msg("register", nickname)
- irc.IRCClient.register(self, nickname)
-
- # defer
- d = self._registering = defer.Deferred()
-
- return d
-
- # irc_ERR_NICKNAMEINUSE
- # alterCollidedNick
- # irc_ERR_ERRONEUSNICKNAME
-
- def irc_ERR_PASSWDMISMATCH (self, prefix, params) :
- err = IRCError('ERR_PASSWDMISMATCH')
- log.err(err)
- self._registering.errback(err)
-
- def irc_RPL_WELCOME (self, prefix, params) :
- self.hostname = prefix
- irc.IRCClient.irc_RPL_WELCOME(self, prefix, params)
-
- def signedOn (self) :
- log.msg("signedOn", self.nickname)
- irc.IRCClient.signedOn(self)
-
- # defer
- d = self._registering
-
- if not d :
- raise Exception("signedOn: not registering?")
-
- self._registering = None
-
- d.callback(self.nickname)
-
- ## Channels
- def join (self, channel, key=None) :
- """
- Join the given channel.
-
- Returns a deferred that callbacks with the IRCChannel once joined, or errbacks.
- """
-
- irc.IRCClient.join(self, channel, key=key)
-
- d = self._channels[normalize(channel)] = defer.Deferred()
-
- return d
-
- # ERR_CHANNELISFULL
- # ERR_INVITEONLYCHAN
- # ERR_BANNEDFROMCHAN
- # ERR_BADCHANNELKEY
-
- def _close_channel (self, channel) :
- """
- Remove channel from our list of channels.
-
- TODO: purge queued messages for channel?
- """
-
- del self._channels[normalize(channel)]
-
- def left (self, channel) :
- if normalize(channel) in self._channels :
- if isinstance(self._channels[normalize(channel)], defer.Deferred) :
- log.msg('IRCClient.left: part during join:', channel)
-
- else :
- log.msg('IRCClient.left: unexpected part:', channel)
- self._close_channel(channel)
-
- # XXX: assume this is: send PART, send JOIN, receive PART, receive JOIN
- #self._close_channel(channel)
- else :
- log.msg("IRCClient.left: parted channel:", channel)
-
- def kickedFrom (self, channel, kicker, message) :
- log.msg('IRCClient.kicked', channel, kicker, message)
-
- self._close_channel(channel)
-
- def joined (self, channel) :
- """
- Have joined given channel.
- """
-
- lookup = normalize(channel)
-
- d = self._channels[lookup]
- channel = self._channels[lookup] = IRCChannel(self, channel)
- d.callback(channel)
-
- @defer.inlineCallbacks
- def channel (self, channel, key=None) :
- """
- Defer a joined IRCChannel.
- """
-
- lookup = normalize(channel)
-
- log.msg('IRCClient.channel', lookup, channel)
-
- if lookup not in self._channels :
- channel = yield self.join(channel, key)
- else :
- # wait or get
- yield self._channels[lookup]
-
- channel = self._channels[lookup]
-
- log.msg('IRCClient.channel', lookup, channel)
-
- defer.returnValue(channel)
-
- ##
- def irc_ERR_CANNOTSENDTOCHAN (self, prefix, params) :
- nick, channel, error = params
-
- log.err(IRCError(channel, error))
-
- ## Quit
- def irc_ERROR (self, prefix, params) :
- msg, = params
- error = IRCError(None, msg)
-
- log.err(error)
-
- if self._registering :
- self._registering.errback(error)
- self._registering = None
-
- def connectionLost (self, reason) :
- irc.IRCClient.connectionLost(self, reason)
- log.err(reason)
-
- if self._registering :
- self._registering.errback(reason)
- self._registering = None
-
- # unregister channels
- for channel in self._channels :
- # errback Deferred or IRCChannel
- self._channels[channel].errback(reason)
-
- self._channels = { }
-
- # unregister client
- self.factory.clientLost(self)
-
- ## Logging
- def url (self, target=None) :
- """
- Format as URL.
- """
-
- if not self.transport : return 'IRC'
-
- # XXX: no isinstance() support
- if interfaces.ITCPTransport.providedBy(self.transport) :
- scheme = 'irc'
- else :
- # TODO: ssl?
- scheme = None
-
- peer = self.transport.getPeer()
-
- if peer.port == PORT :
- peer = "{irc.hostname}".format(irc=self, peer=peer)
- else :
- peer = "{irc.hostname}:{peer.port}".format(irc=self, peer=peer)
-
- if target :
- path = str(target)
- else :
- path = None
-
- return ''.join(part for part in (
- scheme, '://' if scheme else None,
- self.nickname, '@' if self.nickname else None,
- peer,
- '/' if path else None, path
- ) if part)
-
- __str__ = url
- logPrefix = url
-
-class IRCFactory (protocol.ClientFactory) :
- """
- Manage Clients and Targets
- """
-
- NICKNAME = 'irker'
-
- def __init__ (self, nickname=NICKNAME, username=None) :
- # default nickname
- self.nickname = nickname
- self.username = username
-
- # (scheme, host, port, nick) -> IRCClient
- self.clients = {}
-
- def buildProtocol (self, addr) :
- return IRCClient(self)
-
- def clientLost (self, client) :
- """
- Given IRCClient is no more.
- """
-
- log.msg("IRCFactory.clientLost", client)
-
- # remove from our clients
- self.clients = dict((k, c) for k, c in self.clients.iteritems() if c != client)
-
- @defer.inlineCallbacks
- def connect (self, url) :
- """
- Defer a connected, registered Client for given URL.
- """
-
- endpoint = url2endpoint(reactor, url)
-
- log.msg('IRCFactory.connect', url, ':', endpoint)
-
- # connect
- try :
- client = yield endpoint.connect(self)
-
- except error.ConnectError as ex :
- log.err(ex, ': '.join(str(x) for x in ('IRCFactory.connect', url, endpoint)))
- raise
-
- else :
- log.msg('IRCFactory.connect', url, ':', endpoint, ':', client)
-
- # register
- try :
- nickname = yield client.register(url.username or self.nickname,
- username = self.username,
- password = url.password,
- )
-
- except Exception as ex :
- log.err("register", ex)
- raise
-
- log.msg('IRCFactory.connect', url, ':', endpoint, ':', client, ':', nickname)
-
- # okay!
- defer.returnValue(client)
-
- @defer.inlineCallbacks
- def client (self, url) :
- """
- Return IRCClient for given URL.
- """
-
- lookup = (url.scheme, url.hostname, url.port, url.username)
-
- if lookup not in self.clients :
- # deferred for connect
- connect = self.clients[lookup] = self.connect(url)
-
- try :
- # wait on deferred, and then store IRCClient
- self.clients[lookup] = yield connect
-
- except Exception as ex :
- # failed, remove the attempted connect
- del self.clients[lookup]
- raise
-
- else :
- # wait for result, if deferred
- # XXX: this yields None, since the first inlineCallbacks yielding on the deferred returns None in its callback
- yield self.clients[lookup]
-
- # connected client
- client = self.clients.get(lookup)
-
- if client :
- log.msg('IRCFactory.client', url, ":", client)
-
- defer.returnValue(client)
- else :
- log.msg('IRCFactory.client', url, ": client connect failed")
-
- # XXX: get failure from first yield's errback... except inlineCallbacks drops it and goes to callback with None <_<
- raise Exception("Client connect failed")
-
- @defer.inlineCallbacks
- def target (self, url) :
- """
- Return IRCChannel for given URL.
- """
-
- client = yield self.client(url)
-
- channel = '#' + url.path.lstrip('/')
- channel = yield client.channel(channel)
-
- log.msg('IRCFactory.target', url, ":", channel)
-
- defer.returnValue(channel)
-
- @defer.inlineCallbacks
- def privmsg (self, url, *msg) :
- """
- Dispatch given messages to given target.
- """
-
- target = yield self.target(url)
-
- log.msg('IRCFactory.privmsg', url, ":", target, ':', *msg)
-
- target.privmsg(*msg)
--- a/pvl/irker/irk.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,133 +0,0 @@
-"""
- Irker protocol implementation.
-"""
-
-from twisted.internet import protocol, defer
-from twisted.protocols import basic
-from twisted.python import log
-
-import json, urlparse
-
-class Irk (basic.LineOnlyReceiver) :
- """
- A connected Irk client.
- """
-
- delimiter = '\n'
-
- def connectionMade (self) :
- self.transport.logPrefix = self.logPrefix
-
- log.msg("connected", self)
-
- def connectionLost (self, reason) :
- log.err("connection lost", reason)
-
- def error (self, *args) :
- log.err(*args)
- self.transport.loseConnection()
-
- def lineReceived (self, line) :
- """
- JSON -> in
- """
-
- try :
- irk = json.loads(line)
-
- except ValueError as ex :
- # invalid
- return self.error(ex, line)
-
- # dispatch
- self.factory.irkReceived(irk).addErrback(self.error, line)
-
- def __str__ (self) :
- if not self.transport : return 'Irk'
-
- host = self.transport.getHost()
- peer = self.transport.getPeer()
-
- return "{host.host}:{host.port}:{peer.host}".format(host=host, peer=peer)
-
- logPrefix = __str__
-
-class IrkFactory (protocol.ServerFactory) :
- """
- Manage connected Irk clients.
- """
-
- MAXATTR = 16
-
- protocol = Irk
-
- def __init__ (self, irc) :
- self.irc = irc
-
- @defer.inlineCallbacks
- def irkReceived (self, irk) :
- """
- Deffered to handle lookup of target, and then sending message.
-
- Errbacks on failures.
- """
-
- log.msg(str(irk))
-
- if not 'to' in irk :
- raise ValueError("missing target: to")
-
- # MUST NOT be unicode
- # XXX: ValueError?
- url = urlparse.urlparse(str(irk.pop('to')))
-
- # connect, register, join
- target = yield self.irc.target(url)
-
- # dispatch attrs
- for attr, value in irk.iteritems() :
- if len(attr) >= self.MAXATTR or not attr.islower() :
- raise ValueError("invalid attr: %s" % (attr, ))
-
- method = getattr(self, 'irk_' + attr, None)
-
- if not method :
- raise ValueError("unknown attr: %s" % (attr, ))
-
- if value :
- value = unicode(value)
- else :
- value = None
-
- method(target, value)
-
- # XXX: explicitly enable?
- def irk_privmsg (self, target, value) :
- """
- Send PRIVMSG to target.
- """
-
- if not value :
- # legacy
- return
-
- target.privmsg(value)
-
- def irk_notice (self, target, value) :
- """
- Send NOTICE to target.
- """
-
- if not value :
- raise ValueError("empty notice")
-
- target.notice(value)
-
- # TODO: refcounting vs join!
- def irk_part (self, target, value) :
- """
- PART target.
- """
-
- # value is optional
- target.part(value)
--- a/pvl/irker/irker.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,162 +0,0 @@
-"""
- Twisted application
-"""
-
-import sys
-
-from twisted.application import internet, service
-from twisted.internet import reactor, endpoints, defer
-from twisted.python import usage, log
-
-import urlparse
-
-import pvl.irker.irk
-import pvl.irker.irc
-
-def myusername ():
- """
- Return username current process is running under.
- """
-
- import os, pwd
-
- return pwd.getpwuid(os.getuid()).pw_name
-
-
-class Options (usage.Options) :
- optParameters = [
- [ 'irc-nickname', 'n', pvl.irker.irc.IRCFactory.NICKNAME, "Default IRC nickname" ],
- [ 'irc-username', 'u', myusername(), "IRC username (default: system user)" ],
- ]
-
- def __init__ (self) :
- usage.Options.__init__(self)
-
- self.listen_tcp = []
- self.connect = []
- self.target = []
- self.privmsg = []
-
- def opt_listen_tcp (self, listen) :
- """
- Twisted endpoint.
- """
-
- if ':' in listen :
- host, port = listen.split(':')
- else :
- host, port = '', listen
-
- port = int(port)
-
- self.listen_tcp.append((host, port))
-
- def opt_connect (self, connect) :
- """
- Connect to given target.
- """
-
- self.connect.append(urlparse.urlparse(connect))
-
- def opt_target (self, target) :
- """
- Join given target.
- """
-
- self.target.append(urlparse.urlparse(target))
-
- def opt_privmsg (self, privmsg) :
- """
- Send message to targets
- """
-
- self.privmsg.append(privmsg)
-
-@defer.inlineCallbacks
-def connect (irc, connect) :
- """
- Connect to given urls.
- """
-
- try :
- clients = yield defer.gatherResults([irc.client(url) for url in connect])
-
- except Exception as ex :
- log.err(ex)
- return
-
- for client in clients :
- log.msg('--connect', client)
-
-@defer.inlineCallbacks
-def target (irc, target, privmsg) :
- """
- Connect to given urls.
- """
-
- try :
- targets = yield defer.gatherResults([irc.target(url) for url in target])
-
- except Exception as ex :
- log.err(ex)
- return
-
- for target in targets :
- log.msg('--target', target)
-
- target.privmsg(*privmsg)
-
-def makeService (options) :
- """
- Return a Service for running irk -> irc.
- """
-
- s = service.MultiService()
-
- # IRC
- irc = pvl.irker.irc.IRCFactory(options['irc-nickname'], options['irc-username'])
-
- connect(irc, options.connect)
- target(irc, options.target, options.privmsg)
-
- # IRK
- irk = pvl.irker.irk.IrkFactory(irc)
-
- for host, port in options.listen_tcp :
- ss = internet.TCPServer(port, irk, interface=host)
-
- log.msg("--listen", port)
-
- ss.setServiceParent(s)
-
- # return the service collection
- return s
-
-def main (args) :
- options = Options()
- options.parseOptions(args)
-
- # logging
- log.startLogging(sys.stderr, setStdout=False)
-
- # connect
- irc = pvl.irker.irc.IRCFactory(options['irc-nickname'], options['irc-username'])
-
- connect(irc, options.connect)
- target(irc, options.target, options.privmsg)
-
- # listen
- irk = pvl.irker.irk.IrkFactory(irc)
-
- for host, port in options.listen_tcp :
- endpoint = endpoints.TCP4ServerEndpoint(reactor, port, interface=host)
-
- log.msg("listen:", endpoint)
-
- endpoint.listen(irk)
-
- # go
- reactor.run()
-
- return 0
-
--- a/pvl/socket.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,379 +0,0 @@
-"""
- A simple TCP client in the kind of syslog.fifo/file.
-
- Interface: fileno(), __iter__, __call__
-"""
-
-# XXX: absolute import plz
-socket = __import__('socket')
-
-import select
-import errno
-
-import urlparse
-
-import logging; log = logging.getLogger('pvl.socket')
-
-# order matters!
-URL = (
- # scheme family socktype
- ( 'unix', (socket.AF_UNIX, None ) ), # socktype is given
- ( 'tcp', (0, socket.SOCK_STREAM ) ), # AF_UNSPEC
- ( 'udp', (0, socket.SOCK_DGRAM ) ), # AF_UNSPEC
-)
-
-URL_SCHEMES = dict(URL)
-
-def parse (str, port=None, scheme='tcp', unix=socket.SOCK_DGRAM) :
- """
- Parse given string into (AF_*, SOCK_*, host, port).
-
- For AF_UNIX, the path is in host, and port is empty, and the socktype is the given unix=... value.
- """
-
- family, socktype = URL_SCHEMES[scheme]
- url = urlparse.urlparse(str)
-
- # TODO: UNIX?
- if url.scheme and url.netloc :
- # proper url
- family, socktype = URL_SCHEMES[url.scheme]
-
- return family, socktype, url.hostname, url.port or port
-
- elif url.scheme and url.path :
- # host:port
- return family, socktype, url.scheme, int(url.path)
-
- elif url.path :
- # host
- return family, socktype, url.path, port
-
- else :
- raise ValueError("unparseable connect URL: %s", str)
-
-def connect (str, *args, **kwargs) :
- """
- Returns a connected socket for given parse()'d string.
- """
-
- family, socktype, host, port = parse(str, *args, **kwargs)
-
- if family == socket.AF_UNIX :
- raise ValueError("XXX: AF_UNIX is not yet supported", str)
-
- else : # AF_UNSPEC
- return connect_inet(host, port, family=family, socktype=socktype)
-
-def connect_inet (host=None, port=None, family=socket.AF_UNSPEC, socktype=socket.SOCK_STREAM) :
- """
- Return a TCP/UDP socket connected to the given host/port using getaddrinfo.
-
- TODO: timeout?
- """
-
- log.debug("%s:%s: family=%s, socktype=%s", host, port, family, socktype)
-
- if host :
- flags = socket.AI_CANONNAME
- else :
- flags = 0
-
- addrinfo = socket.getaddrinfo(host, port, family, socktype, 0, flags)
-
- if not addrinfo :
- raise Exception("getaddrinfo: %s:%s: no results" % (host, port))
-
- for af, st, proto, name, addr in addrinfo :
- try :
- sock = socket.socket(af, st, proto)
-
- except socket.error as error :
- log.warning("%s:%s: socket: %s", host, port, error)
- continue
-
- log.debug("%s:%s: socket: %s", host, port, sock)
-
- try :
- sock.connect(addr)
-
- except socket.error as error :
- log.warning("%s:%s: connect: %s", host, port, error)
- continue
-
- log.debug("%s:%s: connect", host, port)
- log.info("%s", name)
-
- return sock
-
- else :
- raise Exception("Unable to connect: %s:%s: %s" % (host, port, error))
-
-def reverse (sockaddr, numeric_host=False, numeric_port=True) :
- """
- Resolve given sockaddr, returning (host, port).
- """
-
- flags = 0
-
- if numeric_host :
- flags |= socket.NI_NUMERICHOST
-
- if numeric_port :
- flags |= socket.NI_NUMERICSERV
-
- return socket.getnameinfo(sockaddr, flags)
-
-def socket_str (sock) :
- # get connected peer
- try :
- peer = sock.getpeername()
-
- except socket.error as ex :
- # fails if socket is not connected XXX: even after EOF on read..?
- return str(ex)
-
- # lookup scheme
- for scheme, (family, socktype) in URL :
- if family and family != sock.family :
- continue
- elif socktype and socktype != sock.type :
- continue
- else :
- break
- else :
- scheme = None
-
- host, port = reverse(peer)
-
- if scheme :
- return "{scheme}://{host}:{port}".format(scheme=scheme, host=host, port=port)
- else :
- return "{host}:{port}".format(host=host, port=port)
-
-def nonblocking (call, *args, **kwargs) :
- """
- Call the given function, which read/writes on a nonblocking file, and return None if it would have blocked.
-
- Raises EOFError on SIGPIPE/EPIPE.
-
- # XXX: does python handle SIGPIPE for us?
- """
-
- try :
- return call(*args, **kwargs)
-
- except socket.error as ex :
- # block?
- if ex.errno == errno.EAGAIN or ex.errno == errno.EWOULDBLOCK:
- # empty
- return None
-
- elif ex.errno == errno.EPIPE :
- # XXX: write-eof?
- raise EOFError()
-
- else :
- raise
-
-class ReadStream (object) :
- """
- Buffered stream, supporting non-blocking/line-based reads.
- """
-
- BLOCK=512
-
- def __init__ (self, sock, buffer=None) :
- """
- TODO: buffer - maximum line length
- """
-
- self.sock = sock
- self._buf = ''
-
- def fileno (self) :
- return self.sock.fileno()
-
- def _read (self, block=BLOCK) :
- """
- Read up to n bytes from socket.
-
- Returns None if we would block.
- Raises EOFError on EOF.
- """
-
- buf = nonblocking(self.sock.recv, block)
-
- log.debug("%s: %s", self, buf)
-
- if buf is None :
- return None
- elif buf :
- return buf
- else :
- raise EOFError()
-
- def peek (self) :
- """
- Peek at data in buffer.
- """
-
- return self._buf
-
- def read (self) :
- """
- Read and return any available input.
-
- Returns None if blocking.
- """
-
- if self._buf :
- buf, self._buf = self._buf, ''
-
- else :
- buf = self._read()
-
- return buf
-
- def readline (self) :
- """
- Read and return next waiting line from input.
-
- Line is returned without trailing '\r\n' or '\n'.
-
- Returns None if there is no line available.
-
- XXX: trailing data in buf when _read() raises EOFError?
- """
-
- while '\n' not in self._buf :
- # read chunk
- read = self._read()
-
- if read is None :
- return None
-
- self._buf += read
-
- # split out one line
- line, self._buf = self._buf.split('\n', 1)
-
- # in case we had \r\n
- line = line.rstrip('\r')
-
- log.debug("%s: %s", self, line)
-
- return line
-
- def readlines (self) :
- """
- Read any available input, yielding lines.
-
- Returns None if thre is no more input available.
-
- Raises EOFError in the socket was closed.
- """
-
- while True :
- line = self.readline()
-
- if line is None :
- return
- else :
- yield line
-
- __iter__ = readlines
-
- def __str__ (self) :
- return socket_str(self.sock)
-
-class WriteStream (object) :
- """
- Writable stream, supporting non-blocking/buffered writes.
-
- XXX: buffering is completely untested
- """
-
- EOL = '\n'
-
- def __init__ (self, sock, buffer=None) :
- """
- TODO: buffer - maximum outgoing buffer length
- """
-
- self.sock = sock
- self._buf = buffer
-
- def _write (self, buf) :
- """
- Write given data to socket, returning the number of bytes written, or None, if buffering is enabled.
- """
-
- send = nonblocking(self.sock.send, buf)
-
- # eof on write?
- if send is None :
- return None
-
- elif send :
- # ok, message (partially) written
- return send
-
- else :
- # XXX: zero-length send? how do we handle this? What does it actually mean?
- # handle as a wouldblock...
- return None
-
- def write (self, data) :
- """
- Write given data to socket.
-
- TODO: buffer small chunks -> select writable -> write?
-
- Buffers if not able to write, or raises EOFError (hah!)
- """
-
- if not self._buf :
- # write directly
- while data :
- write = self._write(data)
-
- if write :
- # remaining data
- data = data[write:]
-
- else :
- # cannot write more
- break
-
- if not data :
- # sent
- return
-
- if self._buf is None :
- # no write buffering, and socket buffer full!
- raise EOFError()
-
- # append to outgoing buffer
- self._buf += data
-
- def writeline (self, line, eol=EOL) :
- """
- Write out line.
- """
-
- log.debug("%s: %s", self, line)
-
- self.write(str(line))
- self.write(eol)
-
- def __call__ (self, *lines) :
- for line in lines :
- self.writeline(line)
-
- # TODO: flush
-
- def __str__ (self) :
- return socket_str(self.sock)
-
-
--- a/pvl/syslog/args.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,101 +0,0 @@
-import optparse, sys
-
-from pvl.syslog.parser import SyslogParser
-from pvl.syslog.filter import SyslogFilter
-from pvl.syslog.syslog import SyslogSource
-from pvl.syslog import fifo, tail, file
-
-# XXX: use optparse parser.error()?
-import logging; log = logging.getLogger('pvl.syslog.args')
-
-def parser (parser, prog=None) :
- """
- Optparse option group
-
- prog - filter to only process lines from given process
- """
-
- syslog = optparse.OptionGroup(parser, 'Syslog collector')
-
- syslog.add_option('--syslog-fifo', metavar='PATH',
- help="Read syslog messages from given fifo")
-
- syslog.add_option('--syslog-file', metavar='FILE',
- help="Read syslog messages from given file")
-
- syslog.add_option('--syslog-tail', metavar='FILE',
- help="Continuously poll syslog messages given file")
-
- syslog.add_option('--syslog-stdin', action='store_true',
- help="Read syslog messages from stdin")
-
- syslog.add_option('--syslog-raw', action='store_true',
- help="Parse raw syslog lines without timestamp/etc")
-
- syslog.add_option('--syslog-facility', metavar='FACILITY',
- help="Set/filter by given facility")
-
- syslog.add_option('--syslog-severity', metavar='SEVERITY',
- help="Set given facility")
-
- syslog.add_option('--syslog-prog', metavar='PROG', default=prog,
- help="Filter by given prog: %default")
-
- return syslog
-
-def apply (options, optional=False) :
- """
- Handle options, returning a SyslogSource, if any.
-
- May log.error/sys.exit
- """
-
- # XXX: this belongs in pvl.syslog.source
- if options.syslog_fifo :
- # fifo pipe
- source = fifo.Fifo(options.syslog_fifo)
- poll = True # select(source)
-
- elif options.syslog_tail :
- # tail file
- source = tail.Tail(options.syslog_tail, skip=True)
- poll = tail.Tail.POLL # select(float)
-
- elif options.syslog_file :
- # read file
- source = file.File(open(options.syslog_file))
- poll = False # do not loop, just read up to EOF
-
- elif options.syslog_stdin :
- # read pipe
- source = fifo.Pipe.file(sys.stdin) # puts stdin into non-blocking mode
- poll = True # select(source)
-
- elif optional :
- return None
-
- else :
- # from stdin
- if sys.stdin.isatty() :
- log.warning("Reading syslog messages from TTY?")
-
- source = file.File(sys.stdin)
- poll = False # XXX: tty vs pipe vs file? False -> just block
-
- # options
- parser = SyslogParser(
- raw = options.syslog_raw,
- facility = options.syslog_facility,
- severity = options.syslog_severity,
- )
-
- # TODO: filter optional
- filter = SyslogFilter.build(
- # glob pattern
- prog = options.syslog_prog,
- facility = options.syslog_facility,
- #severity = options.sylog_severity, # XXX: match as greater-than?
- )
-
- # polling
- return SyslogSource(source, parser, filter, poll)
--- a/pvl/syslog/event.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,45 +0,0 @@
-import gevent.core as event
-
-class EventBase (object) :
- """
- libevent-style event base.
-
- XXX: just a wrapper around the gevent libevent bindings, uses implict state..
- """
-
- def __init__ (self) :
- event.init()
-
- def timer (self, timeout, cb) :
- return event.event(0, 0, timeout, cb)
-
- def read (self, fd, cb, timeout=-1) :
- return event.event(event.EV_READ, fd, cb)
-
- def main (self) :
- """
- Run mainloop until exit.
- """
-
- event.dispatch()
-
-class SyslogSource (object) :
- def __init__ (self, source, parser) :
- self.source = source
- self.parser = parser
-
- def event (self, event, evtype) :
- """
- Process source
- """
-
- # directly iter across source
- for item in self.parser.process(self.source) :
- yield item
-
- def process (self, item) :
- """
- Handle item from syslog.
- """
-
-
--- a/pvl/syslog/fifo.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,256 +0,0 @@
-"""
- Non-blocking fifo reads.
-"""
-
-import os
-import errno
-import fcntl
-
-import logging
-
-log = logging.getLogger('pvl.syslog.fifo')
-
-class Pipe (object) :
- """
- A pipe from a fd.
-
- Supports reading lines in a non-blocking fashion.
- """
-
- @classmethod
- def file (cls, file) :
- """
- Create Pipe from file, e.g. sys.stdin.
-
- Puts fd into nonblocking mode, which means that the given file will stop working!
- """
-
- fd = file.fileno()
-
- log.debug("%s: %s", file, fd)
-
- fl = fcntl.fcntl(fd, fcntl.F_GETFL)
- fl |= os.O_NONBLOCK
- fcntl.fcntl(fd, fcntl.F_SETFL, fl)
-
- return cls(fd)
-
- def __init__ (self, fd) :
- """
- May pass fd=None to open as closed.
- """
-
- self._fd = fd
- self._buf = ''
-
- log.debug("pipe: %d", fd)
-
- def open (self, fd) :
- """
- re-open closed pipe to use the given fd.
-
- Raises ValueError if already open.
- """
-
- if self._fd is None :
- self._fd = fd
- else :
- raise ValueError("%s: re-opening already open pipe: %s" % (self, fd))
-
- # XXX: good idea?
- def __nonzero__ (self) :
- """
- Test if we are open.
-
- XXX: signal EOF as well?
- """
-
- return self._fd is not None
-
- def fileno (self) :
- """
- Return the internal fd.
-
- Raises ValueError if we are closed.
- XXX: EOFError?
- """
-
- if self._fd is None :
- raise ValueError("I/O operation on closed pipe: %s" % (self, ))
- else :
- return self._fd
-
- # XXX: this is almost identical to pvl.socket.ReadStream
- def read (self, n=512) :
- """
- Read up to n bytes.
-
- Returns None if we would block.
- Raises EOFError on EOF, or closed.
- """
-
- try :
- buf = os.read(self.fileno(), n)
-
- except OSError as ex :
- # block?
- if ex.errno == errno.EAGAIN :
- # empty
- buf = None
-
- else :
- raise
-
- log.debug("%s: %s", self, buf)
-
- if buf is None :
- return None
- elif buf :
- return buf
- else :
- raise EOFError()
-
- def readline (self) :
- """
- Read and return next waiting line from input.
-
- Line is returned without trailing '\n'.
-
- Returns None if there is no line available.
- Raises EOFError if the fifo write end was closed.
- """
-
- while '\n' not in self._buf :
- # read chunk
- read = self.read()
-
- if read is None :
- return None
-
- self._buf += read
-
- # split out one line
- line, self._buf = self._buf.split('\n', 1)
-
- log.debug("%s", line)
-
- return line
-
- def readlines (self) :
- """
- Read any available input, yielding lines.
-
- Re-opens the FIFO on EOF.
-
- Returns None if there was no more input available, or the fifo was re-opened after EOF.
- """
-
- while True :
- # pull line
- line = self.readline()
-
- if line :
- yield line
- else :
- return # block
-
- __iter__ = readlines
-
- def close (self) :
- """
- Close our fd, if open.
-
- May be open()'d again. Meanwhile, all operatations will raise EOFError.
-
- log.warn's if already closed.
- """
-
- if self._fd is None :
- log.warn("%s: already closed", self)
-
- else :
- log.debug("%s: %s", self, self._fd)
-
- os.close(self._fd)
- self._fd = None
-
- def __str__ (self) :
- return "pipe({self._fd})".format(self=self)
-
-class Fifo (Pipe) :
- """
- A named pipe(7) on the filesystem.
-
- Supports reading lines in a non-blocking fashion, and re-opening on EOF.
- """
-
- def __init__ (self, path) :
- self.path = path
- Pipe.__init__(self, self._open())
-
- def _open (self) :
- """
- Open the internal fd (nonblocking).
- """
-
- fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
-
- log.debug("%s: open: %s", self, fd)
-
- return fd
-
- def open (self) :
- """
- Re-open the FIFO.
-
- Used when the writing end was closed, and read gave EOF. Opening the fifo again will clear the EOF condition,
- and resume nonblocking mode.
-
- Raises ValueError() if already open. close() first.
- """
-
- Pipe.open(self, self._open())
-
- def readlines (self) :
- """
- Read any available input, yielding lines.
-
- Re-opens the FIFO on EOF.
-
- Returns None if there was no more input available, or the fifo was re-opened after EOF.
- """
-
- while True :
- try :
- # pull line
- line = self.readline()
-
- except EOFError :
- log.debug("%s: EOF: reopen", self)
-
- # reopen and go back to waiting
- self.close()
- self.open()
-
- return
-
- if line is None :
- log.debug("%s: EOF: wait", self)
- return # wait
- else :
- yield line
-
- __iter__ = readlines
-
- def __str__ (self) :
- return self.path
-
- # XXX: we need to figure out what references we have lying around, and clean those out!
- def __del__ (self) :
- """
- Cleanup
- """
-
- if self._fd is not None :
- self.close()
-
--- a/pvl/syslog/file.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,98 +0,0 @@
-"""
- Iterate over lines in file-like objects (without buffering lines!), write (flushing output).
-"""
-
-import logging; log = logging.getLogger('pvl.syslog.file')
-
-class File (object) :
- """
- Follow a file-like object, reading lines until no more are available. Never raises EOFError.
-
- Works with python file objects that buffer readlines() when using e.g. `tail -f ... | python -u ...`.
-
- readline() may block once there is no more input available, or may return None for evermore.
-
- There is no fileno(), this is not pollable. At all. Don't even iterate on this with a timeout.
-
- TODO: it would be nice if this raised EOFError (to avoid bugs with polling this infinitely), but at least
- the first readlines() must complete normally
- """
-
- @classmethod
- def open (cls, path, mode='r', **opts) :
- log.debug("%s", path)
-
- return cls(open(path, mode), **opts)
-
- EOL = '\n'
-
- def __init__ (self, file) :
- log.debug("%s", file)
-
- self.file = file
-
- def readline (self) :
- """
- Reads a line from the file, without trailing \n.
-
- Returns None on EOF.
- """
-
- line = self.file.readline()
-
- if not line :
- line = None
- else :
- line = line.rstrip('\r\n')
-
- log.debug("%s", line)
-
- return line
-
- def readlines (self) :
- """
- Reads any available lines from the file.
- """
-
- while True :
- line = self.readline()
-
- if line is None :
- log.debug("%s: eof", self)
- return
- else :
- yield line
-
- __iter__ = readlines
-
- def writeline (self, line, eol=EOL) :
- """
- Write out line.
- """
-
- log.debug("%s", line)
-
- self.file.write(str(line))
- self.file.write(eol)
-
- def __call__ (self, *lines) :
- """
- Write out lines, and flush.
- """
-
- for line in lines :
- self.writeline(line)
-
- self.file.flush()
-
- def close (self) :
- """
- Close our file. Further operations raise ValueError.
- """
-
- log.debug("%s", self)
- self.file.close()
-
- def __str__ (self) :
- # XXX: optional attr?
- return self.file.name
--- a/pvl/syslog/filter.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,157 +0,0 @@
-import logging; log = logging.getLogger('pvl.syslog.filter')
-
-import re # XXX
-import os.path, fnmatch
-
-class SyslogFilter (object) :
- """
- Match syslog messages fields against given patterns.
- """
-
- @classmethod
- def build (cls, **filters) :
- """
- Match using given non-None fields.
- """
-
- # drop None's
- return cls(dict((attr, regex) for attr, regex in filters.iteritems() if regex is not None))
-
- def __init__ (self, filters) :
- """
- Match using given { field: regex }.
- """
-
- self.filters = filters
-
- def match_glob (self, attr, glob, value=None) :
- """
- Match prog as glob.
- """
-
- if not glob :
- return { attr: value }
-
- if not value :
- # require
- return False
-
- # normalize
- value = value.strip()
-
- # match
- if fnmatch.fnmatch(value, glob) :
- return { attr: value }
- else :
- return False
-
- match_facility = match_glob
-
- def match_prog (self, attr, glob, prog=None) :
- """
- Match prog as glob.
- """
-
- if prog :
- # normalize
- prog = prog.strip().lower()
-
- if prog.startswith('/') :
- # leaves postfix/* intact, but fixes /usr/bin/cron
- _, prog = os.path.split(prog)
-
- # match
- return self.match_glob(attr, glob, prog)
-
- REGEX_TYPE = type(re.compile(''))
-
- def match_regex (self, attr, regex, value=None) :
- """
- Match given value against given pattern.
- """
-
- if not regex :
- return { attr: value }
-
- if not value :
- # XXX: optional = match empty string?
- value = ''
- else :
- # normalize; XXX: unicode?
- value = str(value).strip()
-
- # match
- match = regex.match(value)
-
- if not match :
- return False
-
- # as match-values
- matches = { attr: match.group(0) } # whole match
- matches.update(match.groupdict())
-
- # TODO match.expand?
-
- return matches
-
- def filter (self, item) :
- """
- Match given item. Returns any matched values (including regexp capture groups) across all fields.
- """
-
- match = None
- matches = {}
-
- for attr in self.filters :
- # filter
- filter = self.filters[attr]
-
- # lookup match-func
- match = getattr(self, 'match_{attr}'.format(attr=attr), None)
-
- if match :
- pass
-
- elif isinstance(filter, self.REGEX_TYPE) :
- match = self.match_regex
-
- else :
- match = self.match_glob
-
- # apply match
- if attr in item :
- match = match(attr, filter, item[attr])
- else :
- match = match(attr, filter)
-
- log.debug("%s: %s", attr, match)
-
- if match :
- # match
- matches.update(match)
-
- else :
- # reject
- return
-
- # test last match
- if match is None :
- # empty filter -> all None
- return True
- else :
- return matches
-
- def process (self, items) :
- for item in items:
- match = self.filter(item)
-
- if match :
- yield item
-
- __call__ = process
-
- def __nonzero__ (self) :
- return bool(self.filters)
-
- def __repr__ (self) :
- return repr(self.filters)
--- a/pvl/syslog/parser.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,234 +0,0 @@
-import datetime, time
-import re
-
-import logging; log = logging.getLogger('pvl.syslog.parser')
-
-RFC3339_RE = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d+)?(Z|[+-]\d{2}:\d{2})?')
-RFC3339_FMT = '%Y-%m-%dT%H:%M:%S'
-
-def rfc3339 (timestamp) :
- """
- RFC3339 timestamps as used in some syslog implementations.
-
- Returns a datetime in some random timezone, possibly localtime.
- """
-
- match = RFC3339_RE.match(timestamp)
-
- if not match :
- return None
-
- # parts
- dt = datetime.datetime.strptime(match.group(1), RFC3339_FMT)
- tz = match.group(2)
-
- # TODO: timezone?
- return dt
-
- if not tz :
- # XXX: localtime
- return dt
-
- elif tz == 'Z' :
- # UTC
- pass
-
- elif tz[0] in '+-' :
- hours, minutes = tz[1:].split(':')
- td = datetime.timedelta(hours=int(hours), minutes=int(minutes))
-
- if tz[0] == '-' :
- dt += td
- if tz[0] == '+' :
- dt -= td
- else :
- raise ValueError("Invalid timezone offset: %s" % timestamp)
-
- # XXX: UTC
- return dt
-
-RFC3164_RE = re.compile(r'\w{3} [0-9 ][0-9] \d{2}:\d{2}:\d{2}')
-RFC3164_FMT = '%b %d %H:%M:%S'
-RFC3164_PRE = '%Y ' # add missing year, assuming current
-
-def rfc3164 (timestamp) :
- """
- Traditional BSD Syslog timestamps.
-
- Returns a datetime assumed to be in localtime.
- """
-
- if not RFC3164_RE.match(timestamp) :
- return
-
- return datetime.datetime.strptime(time.strftime(RFC3164_PRE) + timestamp, RFC3164_PRE + RFC3164_FMT)
-
-class SyslogParser (object) :
- """
- Parse syslog lines in text format, as used in logfiles/fifos.
- """
-
- SEVERITIES = dict(enumerate((
- 'emerg',
- 'alert',
- 'crit',
- 'err',
- 'warning',
- 'notice',
- 'info',
- 'debug',
- )))
-
- FACILITIES = dict(enumerate((
- 'kern', # 0
- 'user', # 1
- 'mail', # 2
- 'daemon', # 3
- 'auth', # 4
- 'syslog', # 5
- 'lpr', # 6
- 'news', # 7
- 'uucp', # 8
- 'cron', # 9
- 'authpriv', # 10
- 'ftp', # 11
- 'ntp', # 12
- 'audit', # 13
- 'alert', # 14
- 'clock', # 15
- 'local0', # 16
- 'local1', # 17
- 'local2', # 18
- 'local3', # 19
- 'local4', # 20
- 'local5', # 21
- 'local6', # 22
- 'local7', # 23
- )))
-
- # default syslogd format
- SYSLOG_RE = re.compile(
- # the timestamp+hostname header
- # XXX: hostname may be missing
- # at least in Ubuntu 11.10 syslogd 'last message repeated 2 times'...
- r'(?:<(?P<pri>\d+|(?P<facility>\w+)\.(?P<severity>\w+))>)?'
- + r'(?P<timestamp>\w{3} [0-9 ][0-9] \d{2}:\d{2}:\d{2}|.+?) '
- + r'(?P<hostname>\S+)? '
-
- # the message, including possible tag/pid
- + r"(?P<message>(?P<tag>(?P<program>[^:\]]+)(?:\[(?P<pid>\d+)\])?: )?(?P<text>.*))\n?"
- )
-
- def __init__ (self, raw=False, facility=None, severity=None) :
- """
- Using given facility/severity as default.
- """
-
- self.raw = raw
- self.facility = facility
- self.severity = severity
-
- def parse_pri (self, match) :
- """
- Parse pri/facility/severity.
- """
-
- pri = match.group('pri')
- facility = match.group('facility') or self.facility
- severity = match.group('severity') or self.severity
-
- if pri and pri.isdigit() :
- pri = int(pri)
- facility, severity = divmod(pri, 8)
-
- return dict(
- pri = pri,
- severity = self.SEVERITIES.get(severity, severity),
- facility = self.FACILITIES.get(facility, facility)
- )
-
- def parse_timestamp (self, match) :
- """
- Parse timstamp from line into datetime.
- """
-
- timestamp = match.group('timestamp')
-
- # timestamp, in various formats
- try :
- return rfc3164(timestamp) or rfc3339(timestamp)
-
- except ValueError as ex:
- # skip it
- log.warning("timestamp: %s:", timestamp, exc_info=ex)
- return None
-
- def parse_prog (self, match) :
- """
- Parse prog from line.
- """
-
- prog = match.group('program')
-
- if prog :
- return prog
- else :
- # no tag
- return None
-
- def parse (self, line) :
- """
- Parse given input line into SyslogMessage.
- """
-
- # ignore whitespace
- line = line.strip()
-
- # timestamp?
- if self.raw :
- # from defaults
- return dict(
- timestamp = datetime.datetime.now(), # XXX: None?
- host = None,
- prog = None,
- pid = None,
- msg = line,
- )
-
- else :
- # parse
- match = self.SYSLOG_RE.match(line)
-
- if not match :
- log.warn("Unparseable syslog message: %r", line)
- return
-
- # parse
- item = dict(
- timestamp = self.parse_timestamp(match),
- host = match.group('hostname'),
- prog = self.parse_prog(match),
- pid = match.group('pid'),
- msg = match.group('text'),
- )
-
- # facility/severity prefix?
- item.update(self.parse_pri(match))
-
- return item
-
- def process (self, lines) :
- """
- Yield SyslogMessages from given series of lines.
- """
-
- for line in lines :
- item = self.parse(line)
-
- log.debug("%s", item)
-
- if item :
- yield item
-
- __call__ = process
-
--- a/pvl/syslog/rule.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,216 +0,0 @@
-from pvl.syslog.filter import SyslogFilter
-
-import re
-
-import optparse, sys
-import configobj
-
-import logging; log = logging.getLogger('pvl.syslog.rule')
-
-def parser (parser) :
- """
- Optparse option group.
- """
-
- syslog_rules = optparse.OptionGroup(parser, "Syslog rules")
-
- syslog_rules.add_option('--syslog-rules', metavar='FILE',
- help="Load syslog rules from file")
-
- return syslog_rules
-
-def apply (options) :
- """
- Build SyslogRules from options.
- """
-
- if options.syslog_rules :
- return SyslogRule.load(open(options.syslog_rules))
-
- else :
- return SyslogRule('default', formats={ 'text': '{msg}' })
-
-def merge (*dicts, **kwargs) :
- return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
-
-# TODO: combine SyslogRule/Rules into one heirarchial SyslogRule -type?
-class SyslogRule (object) :
- """
- A named SyslogFilter with sub-rules.
- """
-
- @classmethod
- def load (cls, file) :
- """
- Load SyslogRule from file.
- """
-
- config = configobj.ConfigObj(file)
-
- return cls.config_section(file.name, config)
-
- @classmethod
- def config_section (cls, name, section) :
- """
- Recursively load Syslogrules from config section.
- """
-
- rules = [cls.config_section(subsection, section[subsection]) for subsection in section.sections]
- attrs = dict((name, section[name]) for name in section.scalars)
-
- try :
- return cls.config(name, rules, **attrs)
-
- except ValueError as ex :
- raise ValueError("[%s] %s" % (name, ex))
-
- @classmethod
- def config_filters (cls, program=None, facility=None, pattern=None, **filters) :
- """
- Return filter expression from given attr/value in config.
- """
-
- # XXX: get rid of these special cases
- if facility :
- yield 'facility', facility # glob
-
- if program :
- yield 'prog', program # glob
-
- if pattern :
- filters['msg'] = pattern
-
- # generic
- for attr, value in filters.iteritems() :
- try :
- # regex
- yield attr, re.compile(value)
-
- except re.error as ex :
- raise ValueError("%s: %s" % (attr, ex))
-
- @classmethod
- def config (cls, name, rules=None, format=None, irk=None, **filters) :
- """
- Build SyslogRule from config options
- """
-
- if format is not None :
- format = { 'text': format }
- else :
- format = { }
-
- if irk :
- format['irk'] = irk
-
- filters = dict(cls.config_filters(**filters))
-
- filter = SyslogFilter(filters)
-
- log.debug("%s: %s %s", name, rules, filter)
-
- return cls(name, rules, filter, format)
-
- def __init__ (self, name, rules=None, filter=None, formats=None) :
- self.name = name
- self.rules = rules or [] # sub-rules
- self.filter = filter # SyslogFilter
- self.formats = formats or {}
-
- def match (self, item) :
- """
- Match item against our filter, returning match-dict (empty?) or None.
- """
-
- if self.filter :
- # filter
- matches = self.filter.filter(item)
-
- else :
- # match all, we probably have sub-rules that we're interested in
- return { }
-
- log.debug("%s: %s", self, matches)
-
- if matches :
- return matches
- else :
- # no match
- return None
-
- def format (self, item) :
- """
- Apply our output formats to given base apply, yielding (unique) attr, value tuples.
- """
-
- for attr, format in self.formats.iteritems() :
- value = format.format(**item)
-
- log.debug("%s: %s: %s", self, attr, value)
-
- yield attr, value
-
- def apply (self, item) :
- """
- Recursively match item against ourself and sub-rules. Returns applied output.
-
- Matches are passed down the tree, and applies are passed up.
- """
-
- log.debug("%s", self)
-
- # match rule -> matches
- matches = self.match(item)
-
- if matches is None :
- # skip
- return None, None, None
-
- # merge matches down
- item = merge(item, matches)
-
- # recursive sub-rules -> apply
- for rule in self.rules :
- try :
- # pass matches down
- match, rules, apply = rule.apply(item)
-
- except Exception as ex :
- log.exception("%s -> %s: %r", self, rule, item)
- continue # XXX: skip?
-
- if apply :
- # pass apply up
- break
- else :
- # self-match
- match, rules, apply = item, [], { }
-
- rules.append(self)
-
- # formats?
- if self.formats :
- # merge apply up
- apply = merge(dict(self.format(item)), apply)
-
- log.debug("%s: %s", '/'.join(str(rule) for rule in rules), apply)
-
- return match, rules, apply
-
- def __iter__ (self, items) :
- """
- Apply items against our rules, yielding any matches.
- """
-
- for item in items :
- match, rules, apply = self.apply(item)
-
- if apply :
- yield apply
-
- def __str__ (self) :
- return self.name
-
- def __repr__ (self) :
- return 'SyslogRule({self.name}, ...)'.format(self=self)
-
--- a/pvl/syslog/syslog.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,126 +0,0 @@
-"""
- Syslog handling.
-
- XXX: this belongs in pvl.syslog.source (apart from __iter__?)
-"""
-
-import select
-
-import logging; log = logging.getLogger('pvl.syslog.source')
-
-class SyslogSource (object) :
- """
- Process syslog input from a given source.
-
- Implements an iterable mainloop doing continuous polling on the source, using either a timeout or
- select():able source.
- """
-
- def __init__ (self, source, parser, filter, poll=None) :
- """
- Using given underlying line source.
-
- source - source to select() if poll=True
- poll - polling behaviour for source
- """
-
- self.source = source
- self.parser = parser
- self.filter = filter
-
- self.poll = poll
-
- def __iter__ (self) :
- """
- Yield available input.
-
- Raises EOFError if source has been closed.
- """
-
- return self.filter(self.parser(self.source))
-
- def fileno (self) :
- return self.source.fileno()
-
- def select (self, poll=None, reading=(), writing=()) :
- """
- Poll our source for input, with given polling behaviour:
- True - select() on source
- False - peek on source
- float - timeout in seconds
-
- Returns None on unknown, empty sequence on timeout, list of readables on select.
- """
-
- if poll is True :
- timeout = None # block
- reading += (self, ) # source.fileno()
-
- elif not poll :
- timeout = 0.0 # do not block
-
- else :
- timeout = float(poll)
-
- log.debug("%s (%s)", reading, timeout)
-
- # select
- readable, writeable, ex = select.select(reading, writing, [], timeout)
-
- log.debug("select: %s", readable)
-
- if readable :
- return readable
-
- elif reading :
- # timeout
- # XXX: this is the same as readable
- return ()
-
- else :
- # unknown
- return None
-
- def main (self, poll=None) :
- """
- Yield active syslog sources, polling as given.
-
- Returns once no more lines are available.
-
- XXX: reconnect? or source takes care of that..
- TODO: SIGINT -> finish iteration and return?
- """
-
- # from __init__
- # note that we must interpret poll here, since False -> never poll
- if poll is None :
- poll = self.poll
-
- # mainloop
- while True :
- # caller is responsible for reading them!
- yield self
-
- # poll
- if poll :
- # wait
- self.select(poll)
-
- else :
- # done
- break
-
- log.debug("exit")
-
- def close (self) :
- """
- Close the syslog source, if possible.
-
- """
-
- # XXX: do all sources support close?
- self.source.close()
-
- def __str__ (self) :
- return ' | '.join((str(self.source), str(self.parser), str(self.filter)))
-
--- a/pvl/syslog/tail.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,150 +0,0 @@
-"""
- Iterate over input lines in filesystem files.
-"""
-
-import os
-
-import logging; log = logging.getLogger('pvl.syslog.tail')
-
-class Tail (object) :
- """
- Follow a file on the filesystem, reading lines until EOF, and re-opening if replaced.
-
- Never blocks, no fileno() to poll. Just poll(timeout=POLL).
-
- Not writable.
- """
-
- POLL = 2.0
-
- def __init__ (self, path, skip=None, **opts) :
- log.debug("%s", path)
-
- self.path = path
- self.file = self.stat = None # closed
-
- self.open()
-
- if skip :
- self.skip()
-
- def _stat (self) :
- """
- Return a key identifying the file at our path.
- """
-
- st = os.stat(self.path)
-
- stat = st.st_dev, st.st_ino
-
- return stat
-
- def _open (self) :
- """
- Return the opened file.
- """
-
- return open(self.path, 'r')
-
- def open (self) :
- """
- Re-opens our file when closed.
-
- Raises ValueError if already open.
- """
-
- if self.file is None :
- # XXX: use fstat for "atomic" open+stat?
- self.file = self._open()
- self.stat = self._stat()
-
- log.debug("%s: %s: %s", self, self.file, self.stat)
-
- else :
- raise ValueError("%s: open already open tail" % (self, ))
-
- def changed (self) :
- """
- Has the underlying file changed?
- """
-
- return self._stat() != self.stat
-
- def readline (self) :
- """
- Reads a line from the file, without trailing \n.
-
- Returns None on EOF.
- """
-
- line = self.file.readline()
-
- if not line :
- line = None
- else :
- line = line.rstrip('\r\n')
-
- log.debug("%s", line)
-
- return line
-
- def readlines (self, eof_mark=False) :
- """
- Reads any available lines from the file.
-
- Reopens the file on EOF if the underlying file changed.
-
- eof_mark: yields a special None line when this happens.
- """
-
- while True :
- line = self.readline()
-
- if line is not None :
- yield line
-
- elif self.changed() :
- log.debug("EOF: reopen")
-
- self.close()
- self.open()
-
- if eof_mark :
- yield None # special token
-
- # keep going
- continue
-
- else :
- log.debug("EOF: wait")
- break
-
- __iter__ = readlines
-
- def skip (self) :
- """
- Skip any available lines.
- """
-
- log.debug("%s", self)
-
- for line in self.readlines() :
- pass
-
- def close (self) :
- """
- Close our file, if open. Further operations raise ValueError.
-
- log.warn's if already closed.
- """
-
- if self.file :
- log.debug("%s", self)
- self.file.close()
- self.file = None
- else :
- log.warn("%s: close on already closed tail", self)
-
- def __str__ (self) :
- return self.path
-
--- a/setup_irker.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,33 +0,0 @@
-#!/usr/bin/env python
-# encoding: utf-8
-
-__version__ = '0.4.1'
-
-from distutils.core import setup
-
-setup(
- name = 'pvl-irker',
- version = __version__,
- description = "Päivölä IRC utilities",
-
- author = "Tero Marttila",
- author_email = "terom@paivola.fi",
-
- packages = [
- 'pvl',
- 'pvl.irker',
- 'pvl.syslog',
- 'twisted.plugins',
- ],
-
- scripts = [
- 'bin/pvl.irker',
- 'bin/pvl.irk',
- 'bin/pvl.irker-syslog',
- ],
-
- data_files = [
- ('etc/pvl', [ 'etc/syslog.conf.dist' ] ),
- ],
-)
-
--- a/twisted/plugins/pvlirker_plugin.py Tue Feb 19 19:28:40 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,20 +0,0 @@
-from zope.interface import implements
-
-from twisted import plugin
-from twisted.application import service
-from twisted.python import log
-
-import pvl.irker.irker
-
-class ServiceMaker (object) :
- implements(service.IServiceMaker, plugin.IPlugin)
- tapname = 'pvl-irker'
- description = "Irker daemon"
- options = pvl.irker.irker.Options
-
- def makeService (self, options) :
- log.msg('makeService', options)
-
- return pvl.irker.irker.makeService(options)
-
-serviceMaker = ServiceMaker()