--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/invoke.py Sun Jan 20 15:37:36 2013 +0200
@@ -0,0 +1,184 @@
+"""
+ Invoke external commands, with python kwargs -> options mangling.
+"""
+
+import subprocess
+import logging
+
+log = logging.getLogger('pvl.invoke')
+
+class InvokeError (Exception) :
+ def __init__ (self, cmd, exit, error) :
+ self.cmd = cmd
+ self.exit = exit
+ self.error = error
+
+ def __str__ (self) :
+ return "{self.cmd} failed ({self.exit}): {self.error}".format(self=self)
+
+def invoke (cmd, args, stdin=None) :
+ """
+ Invoke a command directly.
+
+ stdin:
+ False -> passthrough stdin/stdout
+ None -> return lines of stdout
+ [lines] -> write lines on stdin, return lines of stdout
+
+ Raises InvokeError on nonzero exit, otherwise log.warn's any stderr.
+ """
+
+ log.debug("{cmd} {args}".format(cmd=cmd, args=' '.join(args)))
+
+ if stdin is False :
+ # keep process stdin/out
+ io = None
+ input = None
+
+ elif stdin :
+ # return stdout, give stdin
+ io = subprocess.PIPE
+ input = '\n'.join(stdin) + '\n'
+
+ else :
+ # return stdout
+ io = subprocess.PIPE
+ input = None
+
+ p = subprocess.Popen([cmd] + args, stdin=io, stdout=io, stderr=io)
+
+ # get output
+ # returns None if not io
+ stdout, stderr = p.communicate(input=input)
+
+ if p.returncode :
+ # failed
+ raise InvokeError(cmd, p.returncode, stderr)
+
+ elif stderr :
+ log.warning("%s: %s", cmd, stderr)
+
+ if stdout :
+ return stdout.splitlines()
+ else :
+ return None
+
+import collections
+
+def process_opt (name, value) :
+ """
+ Mangle from python keyword-argument dict format to command-line option tuple format.
+
+ >>> process_opt('foo', True)
+ ('--foo',)
+ >>> process_opt('foo', 2)
+ ('--foo', '2')
+ >>> process_opt('foo', 'bar')
+ ('--foo', 'bar')
+ >>> process_opt('foo_bar', 'asdf')
+ ('--foo-bar', 'asdf')
+
+ # multi
+ >>> process_opt('foo', ['bar', 'quux'])
+ ('--foo', 'bar', '--foo', 'quux')
+ >>> process_opt('foo', [False, 'bar', True])
+ ('--foo', 'bar', '--foo')
+
+ # empty
+ >>> process_opt('foo', False)
+ ()
+ >>> process_opt('foo', None)
+ ()
+ >>> process_opt('bar', '')
+ ()
+
+ Returns a tuple of argv items.
+ """
+
+ # mangle opt
+ opt = '--' + name.replace('_', '-')
+
+ if value is True :
+ # flag opt
+ return (opt, )
+
+ elif not value :
+ # flag opt / omit
+ return ( )
+
+ elif isinstance(value, basestring) :
+ return (opt, value)
+
+ elif isinstance(value, collections.Iterable) :
+ opts = (process_opt(name, subvalue) for subvalue in value)
+
+ # flatten
+ return tuple(part for parts in opts for part in parts)
+
+ else :
+ # as-is
+ return (opt, str(value))
+
+def optargs (*args, **kwargs) :
+ """
+ Convert args/options into command-line format
+
+ >>> optargs('foo')
+ ['foo']
+ >>> optargs(foo=True)
+ ['--foo']
+ >>> optargs(foo=False)
+ []
+ >>> optargs(foo='bar')
+ ['--foo', 'bar']
+ """
+
+ ## opts
+ # process
+ opts = [process_opt(opt, value) for opt, value in kwargs.iteritems()]
+
+ # flatten
+ opts = [str(part) for parts in opts for part in parts]
+
+ ## args
+ args = [str(arg) for arg in args if arg]
+
+ return opts + args
+
+# XXX: move to pvl.utils or something random?
+def merge (*dicts, **kwargs) :
+ """
+ Merge given dicts together.
+
+ >>> merge(foo=1, bar=2)
+ {'foo': 1, 'bar': 2}
+ >>> merge(dict(foo=1), bar=2)
+ {'foo': 1, 'bar': 2}
+ >>> merge(dict(foo=1), bar=2, foo=3)
+ {'foo': 3, 'bar': 2}
+ >>> merge(dict(foo=1), dict(bar=2), foo=3)
+ {'foo': 3, 'bar': 2}
+ >>> merge(dict(bar=2), dict(foo=1), foo=3)
+ {'foo': 3, 'bar': 2}
+
+ """
+
+ return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())
+
+
+def command (cmd, *args, **opts) :
+ """
+ Invoke a command with options/arguments, given via Python arguments/keyword arguments.
+
+ Return stdout.
+ """
+
+ log.debug("{cmd} {opts} {args}".format(cmd=cmd, args=args, opts=opts))
+
+ # invoke
+ return invoke(cmd, optargs(*args, **opts))
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()
+