import pvl.args,invoke from pvl-verkko
authorTero Marttila <terom@paivola.fi>
Tue, 19 Feb 2013 19:43:47 +0200
changeset 1 ce931075b69e
parent 0 b48dd0a9d7f6
child 2 5a8a32cbc944
import pvl.args,invoke from pvl-verkko
pvl/args.py
pvl/invoke.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/args.py	Tue Feb 19 19:43:47 2013 +0200
@@ -0,0 +1,115 @@
+"""
+    CLI argument handling; common stuff: logging
+"""
+
+import optparse
+import logging
+
+import pwd, grp, os, sys
+
+import logging; log = logging.getLogger('pvl.args')
+
+def parser (parser) :
+    """
+        Return an optparse.OptionGroup.
+    """
+
+    general = optparse.OptionGroup(parser, "General options")
+
+    general.add_option('-q', '--quiet',     dest='loglevel', action='store_const', const=logging.ERROR, help="Less output")
+    general.add_option('-v', '--verbose',   dest='loglevel', action='store_const', const=logging.INFO,  help="More output")
+    general.add_option('-D', '--debug',     dest='loglevel', action='store_const', const=logging.DEBUG, help="Even more output")
+    general.add_option('--log-file',                                                                    help="Log to file")
+    general.add_option('--debug-module',    action='append', metavar='MODULE', 
+            help="Enable logging for the given logger/module name")
+    
+    general.add_option('--uid',             help="Change uid")
+    general.add_option('--gid',             help="Change gid")
+
+    # defaults
+    parser.set_defaults(
+        logname             = parser.prog,
+        loglevel            = logging.WARN,
+        debug_module        = [],
+    )
+ 
+    return general
+
+def options (**options) :
+    """
+        Synthensise options.
+    """
+
+    return optparse.Values(options)
+
+def apply_setid (options, rootok=None) :
+    """
+        Drop privileges if running as root.
+
+        XXX: this feature isn't very useful (import-time issues etc), but in certain cases (syslog-ng -> python),
+        it's difficult to avoid this without some extra wrapper tool..?
+    """
+
+    # --uid -> pw
+    if not options.uid :
+        pw = None
+    elif options.uid.isdigit() :
+        pw = pwd.getpwuid(int(options.uid))
+    else :
+        pw = pwd.getpwnam(options.uid)
+
+    # --gid -> gr
+    if not options.gid and not pw :
+        gr = None
+    elif not options.gid :
+        gr = grp.getgrgid(pw.pw_gid)
+    elif options.gid.isdigit() :
+        gr = grp.getgrgid(str(options.gid))
+    else :
+        gr = grp.getgrnam(options.gid)
+    
+    if gr :
+        # XXX: secondary groups? seem to get cleared
+        log.info("setgid: %s: %s", gr.gr_name, gr.gr_gid)
+        os.setgid(gr.gr_gid)
+
+    if pw :
+        log.info("setuid: %s: %s", pw.pw_name, pw.pw_uid)
+        os.setuid(pw.pw_uid)
+    
+    elif os.getuid() == 0 :
+        if rootok :
+            log.info("running as root")
+        else :
+            log.error("refusing to run as root, use --uid 0 to override")
+            sys.exit(2)
+
+def apply (options, logname=None, rootok=True) :
+    """
+        Apply the optparse options.
+    """
+
+    if logname :
+        prefix = options.logname + ': '
+    else :
+        prefix = ''
+
+    # configure
+    logging.basicConfig(
+        # XXX: log Class.__init__ as Class, not __init__?
+        format      = prefix + '%(name)-20s: %(levelname)5s %(funcName)s: %(message)s',
+        level       = options.loglevel,
+        filename    = options.log_file,
+    )
+
+    # TODO: use --quiet for stdout output?
+    options.quiet = options.loglevel > logging.WARN
+    
+    if options.uid or options.gid or not rootok :
+        # set uid/gid
+        apply_setid(options, rootok=rootok)
+
+    # enable debugging for specific targets
+    for logger in options.debug_module :
+        logging.getLogger(logger).setLevel(logging.DEBUG)
+    
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/invoke.py	Tue Feb 19 19:43:47 2013 +0200
@@ -0,0 +1,184 @@
+"""
+    Invoke external commands, with python kwargs -> options mangling.
+"""
+
+import subprocess
+import logging
+
+log = logging.getLogger('pvl.invoke')
+
+class InvokeError (Exception) :
+    def __init__ (self, cmd, exit, error) :
+        self.cmd = cmd
+        self.exit = exit
+        self.error = error
+
+    def __str__ (self) :
+        return "{self.cmd} failed ({self.exit}): {self.error}".format(self=self)
+
+def invoke (cmd, args, stdin=None) :
+    """
+        Invoke a command directly.
+        
+        stdin:      
+            False   -> passthrough stdin/stdout
+            None    -> return lines of stdout
+            [lines] -> write lines on stdin, return lines of stdout
+
+        Raises InvokeError on nonzero exit, otherwise log.warn's any stderr.
+    """
+    
+    log.debug("{cmd} {args}".format(cmd=cmd, args=' '.join(args)))
+
+    if stdin is False :
+        # keep process stdin/out
+        io = None
+        input = None
+
+    elif stdin :
+        # return stdout, give stdin
+        io = subprocess.PIPE
+        input = '\n'.join(stdin) + '\n'
+
+    else :
+        # return stdout
+        io = subprocess.PIPE
+        input = None
+
+    p = subprocess.Popen([cmd] + args, stdin=io, stdout=io, stderr=io)
+
+    # get output
+    # returns None if not io
+    stdout, stderr = p.communicate(input=input)
+
+    if p.returncode :
+        # failed
+        raise InvokeError(cmd, p.returncode, stderr)
+
+    elif stderr :
+        log.warning("%s: %s", cmd, stderr)
+    
+    if stdout :
+        return stdout.splitlines()
+    else :
+        return None
+
+import collections
+
+def process_opt (name, value) :
+    """
+        Mangle from python keyword-argument dict format to command-line option tuple format.
+
+        >>> process_opt('foo', True)
+        ('--foo',)
+        >>> process_opt('foo', 2)
+        ('--foo', '2')
+        >>> process_opt('foo', 'bar')
+        ('--foo', 'bar')
+        >>> process_opt('foo_bar', 'asdf')
+        ('--foo-bar', 'asdf')
+
+        # multi
+        >>> process_opt('foo', ['bar', 'quux'])
+        ('--foo', 'bar', '--foo', 'quux')
+        >>> process_opt('foo', [False, 'bar', True])
+        ('--foo', 'bar', '--foo')
+
+        # empty
+        >>> process_opt('foo', False)
+        ()
+        >>> process_opt('foo', None)
+        ()
+        >>> process_opt('bar', '')
+        ()
+
+        Returns a tuple of argv items.
+    """
+
+    # mangle opt
+    opt = '--' + name.replace('_', '-')
+
+    if value is True :
+        # flag opt
+        return (opt, )
+
+    elif not value :
+        # flag opt / omit
+        return ( )
+
+    elif isinstance(value, basestring) :
+        return (opt, value)
+
+    elif isinstance(value, collections.Iterable) :
+        opts = (process_opt(name, subvalue) for subvalue in value)
+
+        # flatten
+        return tuple(part for parts in opts for part in parts)
+
+    else :
+        # as-is
+        return (opt, str(value))
+
+def optargs (*args, **kwargs) :
+    """
+        Convert args/options into command-line format
+
+        >>> optargs('foo')
+        ['foo']
+        >>> optargs(foo=True)
+        ['--foo']
+        >>> optargs(foo=False)
+        []
+        >>> optargs(foo='bar')
+        ['--foo', 'bar']
+    """
+
+    ## opts
+    # process
+    opts = [process_opt(opt, value) for opt, value in kwargs.iteritems()]
+
+    # flatten
+    opts = [str(part) for parts in opts for part in parts]
+
+    ## args
+    args = [str(arg) for arg in args if arg]
+
+    return opts + args
+
+# XXX: move to pvl.utils or something random?
+def merge (*dicts, **kwargs) :
+    """
+        Merge given dicts together.
+        
+        >>> merge(foo=1, bar=2)
+        {'foo': 1, 'bar': 2}
+        >>> merge(dict(foo=1), bar=2)
+        {'foo': 1, 'bar': 2}
+        >>> merge(dict(foo=1), bar=2, foo=3)
+        {'foo': 3, 'bar': 2}
+        >>> merge(dict(foo=1), dict(bar=2), foo=3)
+        {'foo': 3, 'bar': 2}
+        >>> merge(dict(bar=2), dict(foo=1), foo=3)
+        {'foo': 3, 'bar': 2}
+
+    """
+
+    return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
+
+
+def command (cmd, *args, **opts) :
+    """
+        Invoke a command with options/arguments, given via Python arguments/keyword arguments.
+
+        Return stdout.
+    """
+    
+    log.debug("{cmd} {opts} {args}".format(cmd=cmd, args=args, opts=opts))
+
+    # invoke
+    return invoke(cmd, optargs(*args, **opts))
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()
+