bin/pvl.backup-snapshot
author Tero Marttila <tero.marttila@aalto.fi>
Mon, 28 Jul 2014 13:14:53 +0300
changeset 80 b332d99f988e
parent 65 462cecaa70d0
permissions -rwxr-xr-x
update for pvl.args; fixing -c/command and --config option dupliates
#!/usr/bin/python

"""
    Manage rsync --link-dest based snapshots.

    rsync's from <src> to <dst>/snapshots/YYYY-MM-DD-HH-MM-SS using --link-dest <dst>/current.

    Updates symlink <dst>/current -> <dst>/snapshots/...

    Then archives <dst>/current to <dst>/<period>/<date> using --link-dest.
"""

import pvl.args
from pvl.backup import __version__
from pvl.backup import rsync, invoke

import optparse, ConfigParser
import os, os.path, stat
import shutil, glob
import datetime
import logging

log = logging.getLogger('main')

# command-line options, global state
options = None

def parse_options (argv, defaults) :
    """
        Parse command-line arguments.
    """

    parser = optparse.OptionParser(
            prog        = argv[0],
            usage       = '%prog: [options] [ --config <path> | --target <path> [ --source <src> ] [ --interval <name> ] ]',
            version     = __version__,

            # module docstring
            # XXX: breaks multi-line descriptions..
            description = __doc__,
    )

    parser.add_option_group(pvl.args.parser(parser, config=False))

    # rsync
    rsync = optparse.OptionGroup(parser, "rsync Options")

    rsync.add_option('--exclude-from',       metavar='FILE',
        help="Read exclude rules from given file")

    rsync.add_option('--include-from',       metavar='FILE',
        help="Read include rules from given file")

    parser.add_option_group(rsync)

    # global
    parser.add_option('--clean-intervals',  action='store_true',
        help="Clean out old interval links")

    parser.add_option('--clean-snapshots',  action='store_true',
        help="Clean out unused snapshots (those not linked to)")

    parser.add_option('--clean',             action='store_true',
        help="Clean out both intervals and snapshots")

    parser.add_option('-n', '--noop',       action='store_true',
        help="Don't actually clean anything")

    #
    parser.add_option('-c', '--config',    metavar='FILE/DIR', action='append',    # multi
        help="Load configuration file(s)")

    parser.add_option('-r', '--run',        metavar='NAME',
        help="Run given set of targets, per config [run/...]")

    #
    parser.add_option('-T', '--target',    metavar='PATH',
        help="Target path")

    parser.add_option('-s', '--source',     metavar='RSYNC-PATH', dest='target_source', default=False,
        help="Run target backup from source in rsync-syntax")

    parser.add_option('--interval',         metavar='NAME', action='append', dest='target_intervals',
        help="Run target with given given interval(s)")


    # defaults
    parser.set_defaults(
        config              = [],
        target_intervals    = [],
    )
    parser.set_defaults(**defaults)

    # parse
    options, args = parser.parse_args(argv[1:])
    
    # general logging/etc
    pvl.args.apply(options)

    if options.clean :
        options.clean_intervals = options.clean_snapshots = options.clean

    if options.include_from :
        options.rsync_options['include-from'] = options.include_from

    if options.exclude_from :
        options.rsync_options['exclude-from'] = options.exclude_from

    return options, args

## Configuration
class ConfigError (Exception) :
    pass

def process_config_name (name) :
    """
        Process config file name into python version
    """

    return name.replace('-', '_')

def parse_config (path, defaults) :
    """
        Parse given config file, returning updated set of configs based on given defaults.
    """

    log.debug("loading config: %s", path)

    config = dict(defaults)
    config_file = ConfigParser.RawConfigParser()
    config_file.read([path])

    # handle each section
    for section in config_file.sections() :
        # mangle
        section_name = process_config_name(section)

        log.debug("section: %s", section_name)

        # subsections
        if ':' in section_name :
            # legacy!
            section_path = section_name.split(':')
        else :
            # new! shiny!
            section_path = section_name.split('/')

        # lookup section dict from config
        lookup = config

        # XXX: sections are not in order, so we can't rely on the parent section being created before we handle the sub-section
        for name in section_path :
            # possibly create
            if name not in lookup :
                lookup[name] = {}

            lookup = lookup[name]
 
        # found dict for this section
        config_section = lookup

        # values
        for name, value in config_file.items(section) :
            # mangle
            name = process_config_name(name)

            log.debug("section: %s: %s = %s", '/'.join(section_path), name, value)

            config_section[name] = value
    
    log.debug("config: %s", config)

    return config

def config_bool (name, value, strict=True) :
    if value.lower() in ('yes', 'true', '1', 'on') :
        return True

    elif value.lower() in ('no', 'false', '0', 'off') :
        return False

    elif strict :
        raise ConfigError("Unrecognized boolean value: {name} = {value}".format(name=name, value=value))

    else :
        # allow non-boolean values
        return value

def config_int (name, value, default=False) :
    if not value and default is not False:
        # returning default value if one is given
        return default

    try :
        return int(value)

    except ValueError, e:
        raise ConfigError("Invalid integer value: {name} = {value}".format(name=name, value=value))

def config_list (name, value) :
    return value.split()

def walk_symlinks (tree, ignore=False) :
    """
        Walk through all symlinks in given dir, yielding:

            (dirpath, name, target)

        Passes through errors from os.listdir/os.lstat.
    """

    for name in os.listdir(tree) :
        if ignore and name in ignore :
            log.debug("%s: ignore: %s", tree, name)
            continue

        path = os.path.join(tree, name)
        
        # stat symlink itself
        st = os.lstat(path)

        if stat.S_ISDIR(st.st_mode) :
            # recurse
            log.debug("%s: tree: %s", tree, name)

            for item in walk_symlinks(path) :
                yield item

        elif stat.S_ISLNK(st.st_mode) :
            # found
            target = os.readlink(path)

            log.debug("%s: link: %s -> %s", tree, name, target)

            yield tree, name, target

        else :
            log.debug("%s: skip: %s", tree, name)


class Interval (object) :
    """
        An interval definition.
    """

    @classmethod
    def from_config (cls, options, name,
        format,

        # deprecated
        keep    = None,
    ) :
        if not format :
            # magic to use snapshot name
            _format = None
        else :
            _format = format

        return cls(name, 
            format  = _format, 
            keep    = config_int('keep', keep, default=None),
        )

    @classmethod
    def from_target_config (cls, name, base, arg) :
        if isinstance(arg, dict) :
            # full instance
            return cls(name,
                format  = arg.get('format', base.format if base else None),
                keep    = arg.get('keep', base.keep if base else None),
            )
        else :
            # partial instance with keep
            return cls(name,
                format  = base.format,
                keep    = config_int('keep', arg) if arg else base.keep,
            )

    def __init__ (self, name, format, keep) :
        self.name = name
        self.format = format
        self.keep = keep

    def __str__ (self) :
        return self.name

class SnapshotError (Exception) :
    """
        An error handling Target.snapshot()
    """

    pass

class Target (object) :
    """
        A target run, i.e. a rsync-snapshot destination dir
            
        [target:...]
    """

    @classmethod
    def config_intervals (cls, name, intervals) :
        for interval, arg in intervals.iteritems() :
            # lookup base from options.intervals
            try :
                base = options.intervals[process_config_name(interval)]
            except KeyError:
                raise ConfigError("Unknown interval for [target/{target}]: {interval}".format(target=name, interval=interval))

            # parse
            yield Interval.from_target_config(interval, base, arg)

    # type() mapping for lvm_options
    LVM_OPTIONS = dict(
        wait    = float,
        size    = str,
    )

    @classmethod
    def from_config (cls, options, name,
        path            = None,
        source          = None,
        enable          = 'no',
        exclude_from    = None,

        # subsections
        intervals       = None,
        rsync_options   = None,
        lvm_options     = {},
    ) :
        if not source and source is not False :
            raise ConfigError("Missing required option: source for [target/{name}]".format(name=name))

        # process lvm opts by LVM_OPTIONS types
        lvm_options = dict((opt, cls.LVM_OPTIONS[opt](value)) for opt, value in lvm_options.iteritems())

        # parse source -> rsync.RSyncServer
        source_path = source
        source = rsync.parse_source(source, lvm_opts=lvm_options)

        log.debug("parse source: %r -> %s", source_path, source)

        # global defaults
        _rsync_options = dict(options.rsync_options)

        if rsync_options :
            # override
            _rsync_options.update([
                # parse
                (option, config_bool(option, value, strict=False)) for option, value in rsync_options.iteritems()
            ])

        if not intervals :
            raise ConfigError("Missing required [target/{name}/intervals]".format(name=name))

        # lookup intervals
        _intervals = list(cls.config_intervals(name, intervals))

        return cls(name, 
            path            = path if path else name,
            source          = source,
            enable          = config_bool('enable', enable),
            intervals       = _intervals,
            rsync_options   = _rsync_options,
            exclude_from    = exclude_from,
        )

    def __init__ (self, name,
        path,
        source, 
        enable          = False, 
        intervals       = [],
        rsync_options   = {},

        # XXX: not implemented?
        exclude_from    = None
    ) :
        self.name = name

        self.path = path
        self.source = source
        self.enable = enable
        
        self.intervals = intervals
        
        self.rsync_options = rsync_options
        self.exclude_from = exclude_from

        # this snapshot?
        self.snapshots_dir = os.path.join(self.path, 'snapshots')

        # 'current' symlink
        self.current_path = os.path.join(self.path, 'current')

    def prepare (self, options) :
        """
            Prepare dir for usage
        """

        if not os.path.exists(self.path) :
            raise Exception("Missing target dir: {path}".format(path=self.path))

        if not os.path.exists(self.snapshots_dir) :
            log.warn("Creating snapshots dir: %s", self.snapshots_dir)
            os.mkdir(self.snapshots_dir)

    def snapshot (self, options, now) :
        """
            Perform the rsync from our source to self.snapshot_dir.

            Raises rsync.RsyncError or SnapshotError.
        """
       
        # new snapshot
        snapshot_name = now.strftime(options.snapshot_format)
        snapshot_path = os.path.join(self.snapshots_dir, snapshot_name)
        temp_path = os.path.join(self.snapshots_dir, 'tmp')

        if os.path.exists(temp_path) :
            raise SnapshotError("Old temp snapshot dir remains, please clean up: {path}".format(path=temp_path))

        # link-dest from current?
        if os.path.exists(self.current_path) :
            # real path to target
            target = os.readlink(self.current_path)
            target_path = os.path.join(os.path.dirname(self.current_path), target)

            log.debug("%s: link-dest: %s", self, target_path)

            # use as link-dest base; hardlinks unchanged files; target directory must be empty
            link_dest = target_path

        else :
            link_dest = None
        
        # log
        log.info("%s: %s -> %s <- %s", self, self.source, snapshot_path, link_dest)

        # build rsync options
        opts = dict(self.rsync_options)
        
        if link_dest :
            # rsync links absolute paths..
            opts['link-dest'] = os.path.abspath(link_dest)
        
        # to tempdir
        log.debug("rsync %s -> %s", self.source, temp_path)

        try :
            # run the rsync.RSyncServer; None as a placeholder will get replaced with the actual source
            self.source.execute(invoke.optargs(**opts), srcdst=(None, temp_path))

        except rsync.RsyncError as ex :
            # XXX:  leaves temp_path in place, which must be removed or cleaned up..
            #       maybe use {snapshot_name}.tmp instead?
            log.warn("%s: rsync failed:", self, exc_info=ex)

            # run() handles this
            raise

        else :
            # move in to final name
            log.debug("rename %s -> %s", temp_path, snapshot_path)
            os.rename(temp_path, snapshot_path)

            return snapshot_name

    def interval (self, options, interval, now, snapshot_name) :
        """
            Update given <interval>/... links for this target, using the given new snapshot
        """

        dir_path = os.path.join(self.path, interval.name)

        if not os.path.exists(dir_path) :
            log.warn("%s/%s: Creating interval dir: %s", self, interval, dir_path)
            os.mkdir(dir_path)
        
        
        # name
        if interval.format is None :
            # per-snapshot
            name = snapshot_name

            log.debug("%s: using snapshot_name: %s", interval, name)

        else :
            # by date
            name = now.strftime(interval.format)
            
            log.debug("%s: using interval.format: %s -> %s", interval, interval.format, name)

        # path
        path_name = os.path.join(interval.name, name)
        path = os.path.join(self.path, path_name)

        log.debug("%s: processing %s", interval, path_name)

        # already there?
        if os.path.exists(path) :
            target = os.readlink(path)

            log.debug("%s: Keeping existing: %s -> %s", interval, name, target)

        else :
            # update
            target = os.path.join('..', 'snapshots', snapshot_name)

            log.info("%s/%s: %s -> %s", self, interval, name, target)
            log.debug("%s -> %s", path, target)

            os.symlink(target, path)


    def clean_interval (self, options, interval) :
        """
            Clean out given <interval>/... dir for this target.
        """

        # path
        dir_path = os.path.join(self.path, interval.name)

        if not os.path.exists(dir_path) :
            log.warn("%s/%s: Skipping, no interval dir: %s", self, interval, dir_path)
            return

        # configured
        keep = interval.keep

        if not keep :
            log.info("%s/%s: Zero keep given, not cleaning up anything", self, interval)
            return

        # items to clean?
        items = os.listdir(dir_path)

        # sort newest -> oldest
        items.sort(reverse=True)

        log.debug("%s/%s: Have %d / %d items", self, interval, len(items), keep)
        log.debug("%s: items: %s", interval, ' '.join(items))

        if len(items) > keep :
            # select oldest ones
            clean = items[keep:]

            log.debug("%s/%s: cleaning out: %s", self, interval, ' '.join(clean))

            for item in clean :
                path = os.path.join(dir_path, item)

                log.info("%s/%s: %s", self, interval, path)

                if not options.noop :
                    log.debug("rmtree: %s", path)
                    os.unlink(path)
                else :
                    log.debug("dryrun: %s", path)

    def clean_snapshots (self, options) :
        """
            Clean out all snapshots for this target not linked to from within our root.

            Fails without doing anything if unable to read the destination dir.
        """

        # real path to snapshots
        snapshots_path = os.path.realpath(os.path.abspath(self.snapshots_dir))
        log.debug("real snapshots_path: %s", snapshots_path)

        # set of found targets
        found = set()

        # walk all symlinks
        for dirpath, name, target in walk_symlinks(self.path, ignore=set(['snapshots'])) :
            # target dir
            target_path = os.path.realpath(os.path.join(dirpath, target))
            target_dir = os.path.dirname(target_path)
            target_name = os.path.basename(target_path)

            if target_dir == snapshots_path :
                log.debug("%s: found: %s -> %s", dirpath, name, target_name)
                found.add(target_name)

            else :
                log.debug("%s: ignore: %s -> %s", dirpath, name, target_path)

        # discover all snapshots
        snapshots = set(os.listdir(snapshots_path))

        # XXX: and ignore special names?
        snapshots = snapshots - set(['tmp'])

        ## compare
        used = snapshots & found
        unused = snapshots - found
        broken = found - snapshots

        log.debug("%s: found used=%d, unused=%d, broken=%d snapshot symlinks", self, len(used), len(unused), len(broken))
        log.debug("used=%s, unused=%s", used, unused)

        if broken :
            log.warn("%s: Found broken symlinks to snapshots: %s", self, ' '.join(broken))
        
        if unused :
            log.debug("%s: Cleaning out %d unused snapshots:", self, len(unused))

            for name in unused :
                path = os.path.join(snapshots_path, name)

                log.info("%s: %s", self, name)

                if not options.noop :
                    log.debug("rmtree: %s", path)

                    # nuke
                    shutil.rmtree(path)

                else :
                    log.debug("dry-run: %s", path)

    def run_snapshot (self, options, now) :
        """
            Run snapshot + update current.
        """

        # initial rsync
        # may fail with RsyncError
        snapshot_name = self.snapshot(options, now)

        # update current
        log.debug("Updating current -> %s", snapshot_name)

        if os.path.islink(self.current_path) :
            # replace
            os.unlink(self.current_path)

        os.symlink(os.path.join('snapshots', snapshot_name), self.current_path)

        return snapshot_name

    def run_intervals (self, options, now, snapshot_name) :
        """
            Run our intervals.
        """

        if not self.intervals :
            log.warn("No intervals given")

        else :
            # maintain intervals
            log.debug("Updating %d intervals...", len(self.intervals))

            for interval in self.intervals :
                log.debug("%s", interval)

                # update
                self.interval(options, interval, now, snapshot_name)

    def run (self, options) :
        """
            Execute
        """

        # prep
        self.prepare(options)

        # clean intervals?
        if options.clean_intervals:
            for interval in self.intervals :
                log.debug("%s: cleaning interval: %s", self, interval)
                self.clean_interval(options, interval)

        # clean snapshots?
        if options.clean_snapshots :
            log.debug("%s: cleaning snapshots...", self)

            self.clean_snapshots(options)

        # snapshot from source?
        if self.source :
            # timestamp for run
            now = datetime.datetime.now()

            log.debug("%s: started snapshot run at: %s", self, now)
            
            try :
                # snapshot + current
                snapshot_name = self.run_snapshot(options, now)

            except rsync.RsyncError as ex :
                # failed, don't update run intervals or such
                log.error("%s: snapshot rsync failed: %s", self, ex)

                return 1

            except SnapshotError as ex :
                # misc. failure
                log.error("%s: %s", self, ex)

                return 2

            # intervals?
            self.run_intervals(options, now, snapshot_name)

        # ok
        return 0

    def __str__ (self) :
        return self.name

def _parse_run_targets (options, config, run) :
    """
        Parse given run section from config into a series of target names to run.
    """

    for target, enable in config['run'][process_config_name(options.run)].iteritems() :
        # enabled?
        enable = config_bool('enable', enable)

        if not enable :
            continue
        
        # check
        if target not in options.targets :
            raise ConfigError("Unknown [target/{target}] in [run/{run}]".format(target=target, run=run))

        yield target

def load_configs (configs, confglob='*.conf') :
    """
        Load configuration files from given list of config paths; supports loading a conf.d
    """

    for path in configs :
        log.debug("%s", path)

        if os.path.isdir(path) :
            # glob dir: $path/$glob
            for globpath in glob.glob(os.path.join(path, confglob)) :
                if os.path.exists(globpath) :
                    yield globpath
                else :
                    raise Exception("Globbed file does not exist: {0}".format(globpath))

        elif os.path.isfile(path) :
            # normal file
            yield path

        elif os.path.exists(path) :
            raise Exception("Unrecognized config file type: {0}".format(path))

        else :
            raise Exception("Given config file does not exist: {0}".format(path))

def run (options, run_targets) :
    # default config
    config = dict(
        rsync_options   = {},
        intervals       = {},
        targets         = {},
    )
    
    # config?
    for path in load_configs(options.config) :
        # load
        try :
            config = parse_config(path, config)
        except ConfigError as e:
            log.error("Configuration error: %s: %s", path, e)
            return 2

    # targets to run
    options.targets = {}
 
    # manual?
    if options.target :
        options.targets['console'] = Target.from_config(options, 'console',
            path        = options.target,
            source      = options.target_source,
            intervals   = dict((name, None) for name in options.target_intervals),
        )
  
    # intervals
    for name in config['intervals'] :
        interval_config = config['intervals'][name]

        # parse
        interval = Interval.from_config(options, name, **interval_config)
        
        log.debug("config interval: %s", name)
        
        # store
        options.intervals[name] = interval

    # rsync options
    for option in config['rsync_options'] :
        value = config['rsync_options'][option]

        # parse, allowing non-boolean values as well...
        value = config_bool(option, value, strict=False)

        log.debug("rsync option: %s=%s", option, value)

        # store
        options.rsync_options[option] = value

    # target definitions
    for name in config['targets'] :
        target_config = config['targets'][name]

        # parse
        target = Target.from_config(options, name, **target_config)

        log.debug("config target: %s", name)

        options.targets[name] = target

    # what targets?
    if run_targets :
        # keep as-is
        log.debug("Running given targets: %s", run_targets)

    if options.run :

        # given [run/...] definition..
        run_targets = list(_parse_run_targets(options, config, options.run))
        
        log.debug("Running %d given [run/%s] targets: %s", len(run_targets), options.run, run_targets)
    
    # run
    if run_targets :
        log.debug("Running %d given targets...", len(run_targets))

        # run given ones
        for name in run_targets :
            try :
                # get
                target = options.targets[name]

            except KeyError:
                log.error("Unknown target given: %s", name)
                log.info("Defined targets: %s", ' '.join(options.targets))
                return 2


            # run
            log.info("%s", name)

            target.run(options)

    else :
        # all targets
        log.debug("Running all %d targets...", len(options.targets))

        # targets
        for name, target in options.targets.iteritems() :
            log.info("%s", name)

            # run
            target.run(options)

    # ok
    return 0

def config_defaults () :
    return dict(
        # snapshots/ naming
        snapshot_format = '%Y%m%d-%H%M%S',

        # rsync options, in invoke.optargs format
        rsync_options = {
            'archive':          True,
            'hard-links':       True,
            'one-file-system':  True,
            'numeric-ids':      True,
            'delete':           True,
        },

        # defined intervals
        intervals       = dict((i.name, i) for i in [
            Interval('recent',
                format  = None,
                keep    = 4,
            ),

            Interval('day',
                format  = '%Y-%m-%d',
                keep    = 7,
            ),

            Interval('week',
                format  = '%Y-%W',
                keep    = 4,
            ),

            Interval('month',
                format  = '%Y-%m',
                keep    = 4,
            ),

            Interval('year',
                format  = '%Y',
                keep    = 1,
            )
        ]),
    )

def main (argv) :
    global options

    # option defaults
    defaults = config_defaults()

    # global options + args
    options, args = parse_options(argv, defaults)

    # args: filter targets
    # XXX: fix name mangling
    targets = [target.replace('-', '_') for target in args]

    try :
        # handle it
        return run(options, targets)

    except Exception, e:
        log.error("Internal error:", exc_info=e)
        return 3

    # ok
    return 0



if __name__ == '__main__' :
    import sys

    sys.exit(main(sys.argv))