# HG changeset patch # User Tero Marttila # Date 1361295827 -7200 # Node ID ce931075b69e2ced6d8159a8d9cd74b9fadb2a3f # Parent b48dd0a9d7f60a0ee60e69884a1fb1953d99d86b import pvl.args,invoke from pvl-verkko diff -r b48dd0a9d7f6 -r ce931075b69e pvl/args.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pvl/args.py Tue Feb 19 19:43:47 2013 +0200 @@ -0,0 +1,115 @@ +""" + CLI argument handling; common stuff: logging +""" + +import optparse +import logging + +import pwd, grp, os, sys + +import logging; log = logging.getLogger('pvl.args') + +def parser (parser) : + """ + Return an optparse.OptionGroup. + """ + + general = optparse.OptionGroup(parser, "General options") + + general.add_option('-q', '--quiet', dest='loglevel', action='store_const', const=logging.ERROR, help="Less output") + general.add_option('-v', '--verbose', dest='loglevel', action='store_const', const=logging.INFO, help="More output") + general.add_option('-D', '--debug', dest='loglevel', action='store_const', const=logging.DEBUG, help="Even more output") + general.add_option('--log-file', help="Log to file") + general.add_option('--debug-module', action='append', metavar='MODULE', + help="Enable logging for the given logger/module name") + + general.add_option('--uid', help="Change uid") + general.add_option('--gid', help="Change gid") + + # defaults + parser.set_defaults( + logname = parser.prog, + loglevel = logging.WARN, + debug_module = [], + ) + + return general + +def options (**options) : + """ + Synthensise options. + """ + + return optparse.Values(options) + +def apply_setid (options, rootok=None) : + """ + Drop privileges if running as root. + + XXX: this feature isn't very useful (import-time issues etc), but in certain cases (syslog-ng -> python), + it's difficult to avoid this without some extra wrapper tool..? + """ + + # --uid -> pw + if not options.uid : + pw = None + elif options.uid.isdigit() : + pw = pwd.getpwuid(int(options.uid)) + else : + pw = pwd.getpwnam(options.uid) + + # --gid -> gr + if not options.gid and not pw : + gr = None + elif not options.gid : + gr = grp.getgrgid(pw.pw_gid) + elif options.gid.isdigit() : + gr = grp.getgrgid(str(options.gid)) + else : + gr = grp.getgrnam(options.gid) + + if gr : + # XXX: secondary groups? seem to get cleared + log.info("setgid: %s: %s", gr.gr_name, gr.gr_gid) + os.setgid(gr.gr_gid) + + if pw : + log.info("setuid: %s: %s", pw.pw_name, pw.pw_uid) + os.setuid(pw.pw_uid) + + elif os.getuid() == 0 : + if rootok : + log.info("running as root") + else : + log.error("refusing to run as root, use --uid 0 to override") + sys.exit(2) + +def apply (options, logname=None, rootok=True) : + """ + Apply the optparse options. + """ + + if logname : + prefix = options.logname + ': ' + else : + prefix = '' + + # configure + logging.basicConfig( + # XXX: log Class.__init__ as Class, not __init__? + format = prefix + '%(name)-20s: %(levelname)5s %(funcName)s: %(message)s', + level = options.loglevel, + filename = options.log_file, + ) + + # TODO: use --quiet for stdout output? + options.quiet = options.loglevel > logging.WARN + + if options.uid or options.gid or not rootok : + # set uid/gid + apply_setid(options, rootok=rootok) + + # enable debugging for specific targets + for logger in options.debug_module : + logging.getLogger(logger).setLevel(logging.DEBUG) + diff -r b48dd0a9d7f6 -r ce931075b69e pvl/invoke.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pvl/invoke.py Tue Feb 19 19:43:47 2013 +0200 @@ -0,0 +1,184 @@ +""" + Invoke external commands, with python kwargs -> options mangling. +""" + +import subprocess +import logging + +log = logging.getLogger('pvl.invoke') + +class InvokeError (Exception) : + def __init__ (self, cmd, exit, error) : + self.cmd = cmd + self.exit = exit + self.error = error + + def __str__ (self) : + return "{self.cmd} failed ({self.exit}): {self.error}".format(self=self) + +def invoke (cmd, args, stdin=None) : + """ + Invoke a command directly. + + stdin: + False -> passthrough stdin/stdout + None -> return lines of stdout + [lines] -> write lines on stdin, return lines of stdout + + Raises InvokeError on nonzero exit, otherwise log.warn's any stderr. + """ + + log.debug("{cmd} {args}".format(cmd=cmd, args=' '.join(args))) + + if stdin is False : + # keep process stdin/out + io = None + input = None + + elif stdin : + # return stdout, give stdin + io = subprocess.PIPE + input = '\n'.join(stdin) + '\n' + + else : + # return stdout + io = subprocess.PIPE + input = None + + p = subprocess.Popen([cmd] + args, stdin=io, stdout=io, stderr=io) + + # get output + # returns None if not io + stdout, stderr = p.communicate(input=input) + + if p.returncode : + # failed + raise InvokeError(cmd, p.returncode, stderr) + + elif stderr : + log.warning("%s: %s", cmd, stderr) + + if stdout : + return stdout.splitlines() + else : + return None + +import collections + +def process_opt (name, value) : + """ + Mangle from python keyword-argument dict format to command-line option tuple format. + + >>> process_opt('foo', True) + ('--foo',) + >>> process_opt('foo', 2) + ('--foo', '2') + >>> process_opt('foo', 'bar') + ('--foo', 'bar') + >>> process_opt('foo_bar', 'asdf') + ('--foo-bar', 'asdf') + + # multi + >>> process_opt('foo', ['bar', 'quux']) + ('--foo', 'bar', '--foo', 'quux') + >>> process_opt('foo', [False, 'bar', True]) + ('--foo', 'bar', '--foo') + + # empty + >>> process_opt('foo', False) + () + >>> process_opt('foo', None) + () + >>> process_opt('bar', '') + () + + Returns a tuple of argv items. + """ + + # mangle opt + opt = '--' + name.replace('_', '-') + + if value is True : + # flag opt + return (opt, ) + + elif not value : + # flag opt / omit + return ( ) + + elif isinstance(value, basestring) : + return (opt, value) + + elif isinstance(value, collections.Iterable) : + opts = (process_opt(name, subvalue) for subvalue in value) + + # flatten + return tuple(part for parts in opts for part in parts) + + else : + # as-is + return (opt, str(value)) + +def optargs (*args, **kwargs) : + """ + Convert args/options into command-line format + + >>> optargs('foo') + ['foo'] + >>> optargs(foo=True) + ['--foo'] + >>> optargs(foo=False) + [] + >>> optargs(foo='bar') + ['--foo', 'bar'] + """ + + ## opts + # process + opts = [process_opt(opt, value) for opt, value in kwargs.iteritems()] + + # flatten + opts = [str(part) for parts in opts for part in parts] + + ## args + args = [str(arg) for arg in args if arg] + + return opts + args + +# XXX: move to pvl.utils or something random? +def merge (*dicts, **kwargs) : + """ + Merge given dicts together. + + >>> merge(foo=1, bar=2) + {'foo': 1, 'bar': 2} + >>> merge(dict(foo=1), bar=2) + {'foo': 1, 'bar': 2} + >>> merge(dict(foo=1), bar=2, foo=3) + {'foo': 3, 'bar': 2} + >>> merge(dict(foo=1), dict(bar=2), foo=3) + {'foo': 3, 'bar': 2} + >>> merge(dict(bar=2), dict(foo=1), foo=3) + {'foo': 3, 'bar': 2} + + """ + + return dict((k, v) for d in (dicts + (kwargs, )) for k, v in d.iteritems()) + + +def command (cmd, *args, **opts) : + """ + Invoke a command with options/arguments, given via Python arguments/keyword arguments. + + Return stdout. + """ + + log.debug("{cmd} {opts} {args}".format(cmd=cmd, args=args, opts=opts)) + + # invoke + return invoke(cmd, optargs(*args, **opts)) + +if __name__ == '__main__': + import doctest + doctest.testmod() +