pvl/invoke.py
author Tero Marttila <terom@paivola.fi>
Sat, 08 Mar 2014 14:37:20 +0200
changeset 30 1053e69664a6
parent 1 ce931075b69e
permissions -rw-r--r--
pvl.invoke: setenv
"""
    Invoke external commands, with python kwargs -> options mangling.
"""

import logging
import os
import subprocess

log = logging.getLogger('pvl.invoke')

class InvokeError (Exception) :
    def __init__ (self, cmd, exit, error) :
        self.cmd = cmd
        self.exit = exit
        self.error = error

    def __str__ (self) :
        return "{self.cmd} failed ({self.exit}): {self.error}".format(self=self)

def invoke (cmd, args, stdin=None, setenv=None) :
    """
        Invoke a command directly.
        
        stdin:      
            False   -> passthrough stdin/stdout
            None    -> return lines of stdout
            [lines] -> write lines on stdin, return lines of stdout

        Raises InvokeError on nonzero exit, otherwise log.warn's any stderr.
    """
    
    log.debug("{cmd} {args}".format(cmd=cmd, args=' '.join(args)))

    if stdin is False :
        # keep process stdin/out
        io = None
        input = None

    elif stdin :
        # return stdout, give stdin
        io = subprocess.PIPE
        input = '\n'.join(stdin) + '\n'

    else :
        # return stdout
        io = subprocess.PIPE
        input = None

    if setenv :
        env = dict(os.environ)
        env.update(setenv)

    else :
        env = None

    p = subprocess.Popen([cmd] + list(args), stdin=io, stdout=io, stderr=io, env=env)

    # get output
    # returns None if not io
    stdout, stderr = p.communicate(input=input)

    if p.returncode :
        # failed
        raise InvokeError(cmd, p.returncode, stderr)

    elif stderr :
        log.warning("%s: %s", cmd, stderr)
    
    if stdout :
        return stdout.splitlines()
    else :
        return None

import collections

def process_opt (name, value) :
    """
        Mangle from python keyword-argument dict format to command-line option tuple format.

        >>> process_opt('foo', True)
        ('--foo',)
        >>> process_opt('foo', 2)
        ('--foo', '2')
        >>> process_opt('foo', 'bar')
        ('--foo', 'bar')
        >>> process_opt('foo_bar', 'asdf')
        ('--foo-bar', 'asdf')

        # multi
        >>> process_opt('foo', ['bar', 'quux'])
        ('--foo', 'bar', '--foo', 'quux')
        >>> process_opt('foo', [False, 'bar', True])
        ('--foo', 'bar', '--foo')

        # empty
        >>> process_opt('foo', False)
        ()
        >>> process_opt('foo', None)
        ()
        >>> process_opt('bar', '')
        ()

        Returns a tuple of argv items.
    """

    # mangle opt
    opt = '--' + name.replace('_', '-')

    if value is True :
        # flag opt
        return (opt, )

    elif not value :
        # flag opt / omit
        return ( )

    elif isinstance(value, basestring) :
        return (opt, value)

    elif isinstance(value, collections.Iterable) :
        opts = (process_opt(name, subvalue) for subvalue in value)

        # flatten
        return tuple(part for parts in opts for part in parts)

    else :
        # as-is
        return (opt, str(value))

def optargs (*args, **kwargs) :
    """
        Convert args/options into command-line format

        >>> optargs('foo')
        ['foo']
        >>> optargs(foo=True)
        ['--foo']
        >>> optargs(foo=False)
        []
        >>> optargs(foo='bar')
        ['--foo', 'bar']
    """

    ## opts
    # process
    opts = [process_opt(opt, value) for opt, value in kwargs.iteritems()]

    # flatten
    opts = [str(part) for parts in opts for part in parts]

    ## args
    args = [str(arg) for arg in args if arg]

    return opts + args

# XXX: move to pvl.utils or something random?
def merge (*dicts, **kwargs) :
    """
        Merge given dicts together.
        
        >>> merge(foo=1, bar=2)
        {'foo': 1, 'bar': 2}
        >>> merge(dict(foo=1), bar=2)
        {'foo': 1, 'bar': 2}
        >>> merge(dict(foo=1), bar=2, foo=3)
        {'foo': 3, 'bar': 2}
        >>> merge(dict(foo=1), dict(bar=2), foo=3)
        {'foo': 3, 'bar': 2}
        >>> merge(dict(bar=2), dict(foo=1), foo=3)
        {'foo': 3, 'bar': 2}

    """

    return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems())


def command (cmd, *args, **opts) :
    """
        Invoke a command with options/arguments, given via Python arguments/keyword arguments.

        Return stdout.
    """
    
    log.debug("{cmd} {opts} {args}".format(cmd=cmd, args=args, opts=opts))

    # invoke
    return invoke(cmd, optargs(*args, **opts))

if __name__ == '__main__':
    import doctest
    doctest.testmod()