pvl.invoke: copy+modify from pvl-ldap...
authorTero Marttila <terom@paivola.fi>
Sun, 20 Jan 2013 15:37:36 +0200
changeset 142 e5dafbc87cbb
parent 141 54512ece5465
child 143 fb48ba17ae3e
pvl.invoke: copy+modify from pvl-ldap...
pvl/invoke.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/invoke.py	Sun Jan 20 15:37:36 2013 +0200
@@ -0,0 +1,184 @@
+"""
+    Invoke external commands, with python kwargs -> options mangling.
+"""
+
+import subprocess
+import logging
+
+log = logging.getLogger('pvl.invoke')
+
+class InvokeError (Exception) :
+    def __init__ (self, cmd, exit, error) :
+        self.cmd = cmd
+        self.exit = exit
+        self.error = error
+
+    def __str__ (self) :
+        return "{self.cmd} failed ({self.exit}): {self.error}".format(self=self)
+
+def invoke (cmd, args, stdin=None) :
+    """
+        Invoke a command directly.
+        
+        stdin:      
+            False   -> passthrough stdin/stdout
+            None    -> return lines of stdout
+            [lines] -> write lines on stdin, return lines of stdout
+
+        Raises InvokeError on nonzero exit, otherwise log.warn's any stderr.
+    """
+    
+    log.debug("{cmd} {args}".format(cmd=cmd, args=' '.join(args)))
+
+    if stdin is False :
+        # keep process stdin/out
+        io = None
+        input = None
+
+    elif stdin :
+        # return stdout, give stdin
+        io = subprocess.PIPE
+        input = '\n'.join(stdin) + '\n'
+
+    else :
+        # return stdout
+        io = subprocess.PIPE
+        input = None
+
+    p = subprocess.Popen([cmd] + args, stdin=io, stdout=io, stderr=io)
+
+    # get output
+    # returns None if not io
+    stdout, stderr = p.communicate(input=input)
+
+    if p.returncode :
+        # failed
+        raise InvokeError(cmd, p.returncode, stderr)
+
+    elif stderr :
+        log.warning("%s: %s", cmd, stderr)
+    
+    if stdout :
+        return stdout.splitlines()
+    else :
+        return None
+
+import collections
+
+def process_opt (name, value) :
+    """
+        Mangle from python keyword-argument dict format to command-line option tuple format.
+
+        >>> process_opt('foo', True)
+        ('--foo',)
+        >>> process_opt('foo', 2)
+        ('--foo', '2')
+        >>> process_opt('foo', 'bar')
+        ('--foo', 'bar')
+        >>> process_opt('foo_bar', 'asdf')
+        ('--foo-bar', 'asdf')
+
+        # multi
+        >>> process_opt('foo', ['bar', 'quux'])
+        ('--foo', 'bar', '--foo', 'quux')
+        >>> process_opt('foo', [False, 'bar', True])
+        ('--foo', 'bar', '--foo')
+
+        # empty
+        >>> process_opt('foo', False)
+        ()
+        >>> process_opt('foo', None)
+        ()
+        >>> process_opt('bar', '')
+        ()
+
+        Returns a tuple of argv items.
+    """
+
+    # mangle opt
+    opt = '--' + name.replace('_', '-')
+
+    if value is True :
+        # flag opt
+        return (opt, )
+
+    elif not value :
+        # flag opt / omit
+        return ( )
+
+    elif isinstance(value, basestring) :
+        return (opt, value)
+
+    elif isinstance(value, collections.Iterable) :
+        opts = (process_opt(name, subvalue) for subvalue in value)
+
+        # flatten
+        return tuple(part for parts in opts for part in parts)
+
+    else :
+        # as-is
+        return (opt, str(value))
+
+def optargs (*args, **kwargs) :
+    """
+        Convert args/options into command-line format
+
+        >>> optargs('foo')
+        ['foo']
+        >>> optargs(foo=True)
+        ['--foo']
+        >>> optargs(foo=False)
+        []
+        >>> optargs(foo='bar')
+        ['--foo', 'bar']
+    """
+
+    ## opts
+    # process
+    opts = [process_opt(opt, value) for opt, value in kwargs.iteritems()]
+
+    # flatten
+    opts = [str(part) for parts in opts for part in parts]
+
+    ## args
+    args = [str(arg) for arg in args if arg]
+
+    return opts + args
+
+# XXX: move to pvl.utils or something random?
+def merge (*dicts, **kwargs) :
+    """
+        Merge given dicts together.
+        
+        >>> merge(foo=1, bar=2)
+        {'foo': 1, 'bar': 2}
+        >>> merge(dict(foo=1), bar=2)
+        {'foo': 1, 'bar': 2}
+        >>> merge(dict(foo=1), bar=2, foo=3)
+        {'foo': 3, 'bar': 2}
+        >>> merge(dict(foo=1), dict(bar=2), foo=3)
+        {'foo': 3, 'bar': 2}
+        >>> merge(dict(bar=2), dict(foo=1), foo=3)
+        {'foo': 3, 'bar': 2}
+
+    """
+
+    return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
+
+
+def command (cmd, *args, **opts) :
+    """
+        Invoke a command with options/arguments, given via Python arguments/keyword arguments.
+
+        Return stdout.
+    """
+    
+    log.debug("{cmd} {opts} {args}".format(cmd=cmd, args=args, opts=opts))
+
+    # invoke
+    return invoke(cmd, optargs(*args, **opts))
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()
+