#!/usr/bin/python
"""
Manage rsync --link-dest based snapshots.
rsync's from <src> to <dst>/snapshots/YYYY-MM-DD-HH-MM-SS using --link-dest <dst>/current.
Updates symlink <dst>/current -> <dst>/snapshots/...
Then archives <dst>/current to <dst>/<period>/<date> using --link-dest.
"""
import pvl.args
from pvl.backup import __version__
from pvl.backup import rsync, invoke
import optparse, ConfigParser
import os, os.path, stat
import shutil, glob
import datetime
import logging
log = logging.getLogger('main')
# command-line options, global state
options = None
def parse_options (argv, defaults) :
"""
Parse command-line arguments.
"""
parser = optparse.OptionParser(
prog = argv[0],
usage = '%prog: [options] [ --config <path> | --target <path> [ --source <src> ] [ --interval <name> ] ]',
version = __version__,
# module docstring
# XXX: breaks multi-line descriptions..
description = __doc__,
)
parser.add_option_group(pvl.args.parser(parser, config=False))
# rsync
rsync = optparse.OptionGroup(parser, "rsync Options")
rsync.add_option('--exclude-from', metavar='FILE',
help="Read exclude rules from given file")
rsync.add_option('--include-from', metavar='FILE',
help="Read include rules from given file")
parser.add_option_group(rsync)
# global
parser.add_option('--clean-intervals', action='store_true',
help="Clean out old interval links")
parser.add_option('--clean-snapshots', action='store_true',
help="Clean out unused snapshots (those not linked to)")
parser.add_option('--clean', action='store_true',
help="Clean out both intervals and snapshots")
parser.add_option('-n', '--noop', action='store_true',
help="Don't actually clean anything")
#
parser.add_option('-c', '--config', metavar='FILE/DIR', action='append', # multi
help="Load configuration file(s)")
parser.add_option('-r', '--run', metavar='NAME',
help="Run given set of targets, per config [run/...]")
#
parser.add_option('-T', '--target', metavar='PATH',
help="Target path")
parser.add_option('-s', '--source', metavar='RSYNC-PATH', dest='target_source', default=False,
help="Run target backup from source in rsync-syntax")
parser.add_option('--interval', metavar='NAME', action='append', dest='target_intervals',
help="Run target with given given interval(s)")
# defaults
parser.set_defaults(
config = [],
target_intervals = [],
)
parser.set_defaults(**defaults)
# parse
options, args = parser.parse_args(argv[1:])
# general logging/etc
pvl.args.apply(options)
if options.clean :
options.clean_intervals = options.clean_snapshots = options.clean
if options.include_from :
options.rsync_options['include-from'] = options.include_from
if options.exclude_from :
options.rsync_options['exclude-from'] = options.exclude_from
return options, args
## Configuration
class ConfigError (Exception) :
pass
def process_config_name (name) :
"""
Process config file name into python version
"""
return name.replace('-', '_')
def parse_config (path, defaults) :
"""
Parse given config file, returning updated set of configs based on given defaults.
"""
log.debug("loading config: %s", path)
config = dict(defaults)
config_file = ConfigParser.RawConfigParser()
config_file.read([path])
# handle each section
for section in config_file.sections() :
# mangle
section_name = process_config_name(section)
log.debug("section: %s", section_name)
# subsections
if ':' in section_name :
# legacy!
section_path = section_name.split(':')
else :
# new! shiny!
section_path = section_name.split('/')
# lookup section dict from config
lookup = config
# XXX: sections are not in order, so we can't rely on the parent section being created before we handle the sub-section
for name in section_path :
# possibly create
if name not in lookup :
lookup[name] = {}
lookup = lookup[name]
# found dict for this section
config_section = lookup
# values
for name, value in config_file.items(section) :
# mangle
name = process_config_name(name)
log.debug("section: %s: %s = %s", '/'.join(section_path), name, value)
config_section[name] = value
log.debug("config: %s", config)
return config
def config_bool (name, value, strict=True) :
if value.lower() in ('yes', 'true', '1', 'on') :
return True
elif value.lower() in ('no', 'false', '0', 'off') :
return False
elif strict :
raise ConfigError("Unrecognized boolean value: {name} = {value}".format(name=name, value=value))
else :
# allow non-boolean values
return value
def config_int (name, value, default=False) :
if not value and default is not False:
# returning default value if one is given
return default
try :
return int(value)
except ValueError, e:
raise ConfigError("Invalid integer value: {name} = {value}".format(name=name, value=value))
def config_list (name, value) :
return value.split()
def walk_symlinks (tree, ignore=False) :
"""
Walk through all symlinks in given dir, yielding:
(dirpath, name, target)
Passes through errors from os.listdir/os.lstat.
"""
for name in os.listdir(tree) :
if ignore and name in ignore :
log.debug("%s: ignore: %s", tree, name)
continue
path = os.path.join(tree, name)
# stat symlink itself
st = os.lstat(path)
if stat.S_ISDIR(st.st_mode) :
# recurse
log.debug("%s: tree: %s", tree, name)
for item in walk_symlinks(path) :
yield item
elif stat.S_ISLNK(st.st_mode) :
# found
target = os.readlink(path)
log.debug("%s: link: %s -> %s", tree, name, target)
yield tree, name, target
else :
log.debug("%s: skip: %s", tree, name)
class Interval (object) :
"""
An interval definition.
"""
@classmethod
def from_config (cls, options, name,
format,
# deprecated
keep = None,
) :
if not format :
# magic to use snapshot name
_format = None
else :
_format = format
return cls(name,
format = _format,
keep = config_int('keep', keep, default=None),
)
@classmethod
def from_target_config (cls, name, base, arg) :
if isinstance(arg, dict) :
# full instance
return cls(name,
format = arg.get('format', base.format if base else None),
keep = arg.get('keep', base.keep if base else None),
)
else :
# partial instance with keep
return cls(name,
format = base.format,
keep = config_int('keep', arg) if arg else base.keep,
)
def __init__ (self, name, format, keep) :
self.name = name
self.format = format
self.keep = keep
def __str__ (self) :
return self.name
class SnapshotError (Exception) :
"""
An error handling Target.snapshot()
"""
pass
class Target (object) :
"""
A target run, i.e. a rsync-snapshot destination dir
[target:...]
"""
@classmethod
def config_intervals (cls, name, intervals) :
for interval, arg in intervals.iteritems() :
# lookup base from options.intervals
try :
base = options.intervals[process_config_name(interval)]
except KeyError:
raise ConfigError("Unknown interval for [target/{target}]: {interval}".format(target=name, interval=interval))
# parse
yield Interval.from_target_config(interval, base, arg)
# type() mapping for lvm_options
LVM_OPTIONS = dict(
wait = float,
size = str,
)
@classmethod
def from_config (cls, options, name,
path = None,
source = None,
enable = 'no',
exclude_from = None,
# subsections
intervals = None,
rsync_options = None,
lvm_options = {},
) :
if not source and source is not False :
raise ConfigError("Missing required option: source for [target/{name}]".format(name=name))
# process lvm opts by LVM_OPTIONS types
lvm_options = dict((opt, cls.LVM_OPTIONS[opt](value)) for opt, value in lvm_options.iteritems())
# parse source -> rsync.RSyncServer
source_path = source
source = rsync.parse_source(source, lvm_opts=lvm_options)
log.debug("parse source: %r -> %s", source_path, source)
# global defaults
_rsync_options = dict(options.rsync_options)
if rsync_options :
# override
_rsync_options.update([
# parse
(option, config_bool(option, value, strict=False)) for option, value in rsync_options.iteritems()
])
if not intervals :
raise ConfigError("Missing required [target/{name}/intervals]".format(name=name))
# lookup intervals
_intervals = list(cls.config_intervals(name, intervals))
return cls(name,
path = path if path else name,
source = source,
enable = config_bool('enable', enable),
intervals = _intervals,
rsync_options = _rsync_options,
exclude_from = exclude_from,
)
def __init__ (self, name,
path,
source,
enable = False,
intervals = [],
rsync_options = {},
# XXX: not implemented?
exclude_from = None
) :
self.name = name
self.path = path
self.source = source
self.enable = enable
self.intervals = intervals
self.rsync_options = rsync_options
self.exclude_from = exclude_from
# this snapshot?
self.snapshots_dir = os.path.join(self.path, 'snapshots')
# 'current' symlink
self.current_path = os.path.join(self.path, 'current')
def prepare (self, options) :
"""
Prepare dir for usage
"""
if not os.path.exists(self.path) :
raise Exception("Missing target dir: {path}".format(path=self.path))
if not os.path.exists(self.snapshots_dir) :
log.warn("Creating snapshots dir: %s", self.snapshots_dir)
os.mkdir(self.snapshots_dir)
def snapshot (self, options, now) :
"""
Perform the rsync from our source to self.snapshot_dir.
Raises rsync.RsyncError or SnapshotError.
"""
# new snapshot
snapshot_name = now.strftime(options.snapshot_format)
snapshot_path = os.path.join(self.snapshots_dir, snapshot_name)
temp_path = os.path.join(self.snapshots_dir, 'tmp')
if os.path.exists(temp_path) :
raise SnapshotError("Old temp snapshot dir remains, please clean up: {path}".format(path=temp_path))
# link-dest from current?
if os.path.exists(self.current_path) :
# real path to target
target = os.readlink(self.current_path)
target_path = os.path.join(os.path.dirname(self.current_path), target)
log.debug("%s: link-dest: %s", self, target_path)
# use as link-dest base; hardlinks unchanged files; target directory must be empty
link_dest = target_path
else :
link_dest = None
# log
log.info("%s: %s -> %s <- %s", self, self.source, snapshot_path, link_dest)
# build rsync options
opts = dict(self.rsync_options)
if link_dest :
# rsync links absolute paths..
opts['link-dest'] = os.path.abspath(link_dest)
# to tempdir
log.debug("rsync %s -> %s", self.source, temp_path)
try :
# run the rsync.RSyncServer; None as a placeholder will get replaced with the actual source
self.source.execute(invoke.optargs(**opts), srcdst=(None, temp_path))
except rsync.RsyncError as ex :
# XXX: leaves temp_path in place, which must be removed or cleaned up..
# maybe use {snapshot_name}.tmp instead?
log.warn("%s: rsync failed:", self, exc_info=ex)
# run() handles this
raise
else :
# move in to final name
log.debug("rename %s -> %s", temp_path, snapshot_path)
os.rename(temp_path, snapshot_path)
return snapshot_name
def interval (self, options, interval, now, snapshot_name) :
"""
Update given <interval>/... links for this target, using the given new snapshot
"""
dir_path = os.path.join(self.path, interval.name)
if not os.path.exists(dir_path) :
log.warn("%s/%s: Creating interval dir: %s", self, interval, dir_path)
os.mkdir(dir_path)
# name
if interval.format is None :
# per-snapshot
name = snapshot_name
log.debug("%s: using snapshot_name: %s", interval, name)
else :
# by date
name = now.strftime(interval.format)
log.debug("%s: using interval.format: %s -> %s", interval, interval.format, name)
# path
path_name = os.path.join(interval.name, name)
path = os.path.join(self.path, path_name)
log.debug("%s: processing %s", interval, path_name)
# already there?
if os.path.exists(path) :
target = os.readlink(path)
log.debug("%s: Keeping existing: %s -> %s", interval, name, target)
else :
# update
target = os.path.join('..', 'snapshots', snapshot_name)
log.info("%s/%s: %s -> %s", self, interval, name, target)
log.debug("%s -> %s", path, target)
os.symlink(target, path)
def clean_interval (self, options, interval) :
"""
Clean out given <interval>/... dir for this target.
"""
# path
dir_path = os.path.join(self.path, interval.name)
if not os.path.exists(dir_path) :
log.warn("%s/%s: Skipping, no interval dir: %s", self, interval, dir_path)
return
# configured
keep = interval.keep
if not keep :
log.info("%s/%s: Zero keep given, not cleaning up anything", self, interval)
return
# items to clean?
items = os.listdir(dir_path)
# sort newest -> oldest
items.sort(reverse=True)
log.debug("%s/%s: Have %d / %d items", self, interval, len(items), keep)
log.debug("%s: items: %s", interval, ' '.join(items))
if len(items) > keep :
# select oldest ones
clean = items[keep:]
log.debug("%s/%s: cleaning out: %s", self, interval, ' '.join(clean))
for item in clean :
path = os.path.join(dir_path, item)
log.info("%s/%s: %s", self, interval, path)
if not options.noop :
log.debug("rmtree: %s", path)
os.unlink(path)
else :
log.debug("dryrun: %s", path)
def clean_snapshots (self, options) :
"""
Clean out all snapshots for this target not linked to from within our root.
Fails without doing anything if unable to read the destination dir.
"""
# real path to snapshots
snapshots_path = os.path.realpath(os.path.abspath(self.snapshots_dir))
log.debug("real snapshots_path: %s", snapshots_path)
# set of found targets
found = set()
# walk all symlinks
for dirpath, name, target in walk_symlinks(self.path, ignore=set(['snapshots'])) :
# target dir
target_path = os.path.realpath(os.path.join(dirpath, target))
target_dir = os.path.dirname(target_path)
target_name = os.path.basename(target_path)
if target_dir == snapshots_path :
log.debug("%s: found: %s -> %s", dirpath, name, target_name)
found.add(target_name)
else :
log.debug("%s: ignore: %s -> %s", dirpath, name, target_path)
# discover all snapshots
snapshots = set(os.listdir(snapshots_path))
# XXX: and ignore special names?
snapshots = snapshots - set(['tmp'])
## compare
used = snapshots & found
unused = snapshots - found
broken = found - snapshots
log.debug("%s: found used=%d, unused=%d, broken=%d snapshot symlinks", self, len(used), len(unused), len(broken))
log.debug("used=%s, unused=%s", used, unused)
if broken :
log.warn("%s: Found broken symlinks to snapshots: %s", self, ' '.join(broken))
if unused :
log.debug("%s: Cleaning out %d unused snapshots:", self, len(unused))
for name in unused :
path = os.path.join(snapshots_path, name)
log.info("%s: %s", self, name)
if not options.noop :
log.debug("rmtree: %s", path)
# nuke
shutil.rmtree(path)
else :
log.debug("dry-run: %s", path)
def run_snapshot (self, options, now) :
"""
Run snapshot + update current.
"""
# initial rsync
# may fail with RsyncError
snapshot_name = self.snapshot(options, now)
# update current
log.debug("Updating current -> %s", snapshot_name)
if os.path.islink(self.current_path) :
# replace
os.unlink(self.current_path)
os.symlink(os.path.join('snapshots', snapshot_name), self.current_path)
return snapshot_name
def run_intervals (self, options, now, snapshot_name) :
"""
Run our intervals.
"""
if not self.intervals :
log.warn("No intervals given")
else :
# maintain intervals
log.debug("Updating %d intervals...", len(self.intervals))
for interval in self.intervals :
log.debug("%s", interval)
# update
self.interval(options, interval, now, snapshot_name)
def run (self, options) :
"""
Execute
"""
# prep
self.prepare(options)
# clean intervals?
if options.clean_intervals:
for interval in self.intervals :
log.debug("%s: cleaning interval: %s", self, interval)
self.clean_interval(options, interval)
# clean snapshots?
if options.clean_snapshots :
log.debug("%s: cleaning snapshots...", self)
self.clean_snapshots(options)
# snapshot from source?
if self.source :
# timestamp for run
now = datetime.datetime.now()
log.debug("%s: started snapshot run at: %s", self, now)
try :
# snapshot + current
snapshot_name = self.run_snapshot(options, now)
except rsync.RsyncError as ex :
# failed, don't update run intervals or such
log.error("%s: snapshot rsync failed: %s", self, ex)
return 1
except SnapshotError as ex :
# misc. failure
log.error("%s: %s", self, ex)
return 2
# intervals?
self.run_intervals(options, now, snapshot_name)
# ok
return 0
def __str__ (self) :
return self.name
def _parse_run_targets (options, config, run) :
"""
Parse given run section from config into a series of target names to run.
"""
for target, enable in config['run'][process_config_name(options.run)].iteritems() :
# enabled?
enable = config_bool('enable', enable)
if not enable :
continue
# check
if target not in options.targets :
raise ConfigError("Unknown [target/{target}] in [run/{run}]".format(target=target, run=run))
yield target
def load_configs (configs, confglob='*.conf') :
"""
Load configuration files from given list of config paths; supports loading a conf.d
"""
for path in configs :
log.debug("%s", path)
if os.path.isdir(path) :
# glob dir: $path/$glob
for globpath in glob.glob(os.path.join(path, confglob)) :
if os.path.exists(globpath) :
yield globpath
else :
raise Exception("Globbed file does not exist: {0}".format(globpath))
elif os.path.isfile(path) :
# normal file
yield path
elif os.path.exists(path) :
raise Exception("Unrecognized config file type: {0}".format(path))
else :
raise Exception("Given config file does not exist: {0}".format(path))
def run (options, run_targets) :
# default config
config = dict(
rsync_options = {},
intervals = {},
targets = {},
)
# config?
for path in load_configs(options.config) :
# load
try :
config = parse_config(path, config)
except ConfigError as e:
log.error("Configuration error: %s: %s", path, e)
return 2
# targets to run
options.targets = {}
# manual?
if options.target :
options.targets['console'] = Target.from_config(options, 'console',
path = options.target,
source = options.target_source,
intervals = dict((name, None) for name in options.target_intervals),
)
# intervals
for name in config['intervals'] :
interval_config = config['intervals'][name]
# parse
interval = Interval.from_config(options, name, **interval_config)
log.debug("config interval: %s", name)
# store
options.intervals[name] = interval
# rsync options
for option in config['rsync_options'] :
value = config['rsync_options'][option]
# parse, allowing non-boolean values as well...
value = config_bool(option, value, strict=False)
log.debug("rsync option: %s=%s", option, value)
# store
options.rsync_options[option] = value
# target definitions
for name in config['targets'] :
target_config = config['targets'][name]
# parse
target = Target.from_config(options, name, **target_config)
log.debug("config target: %s", name)
options.targets[name] = target
# what targets?
if run_targets :
# keep as-is
log.debug("Running given targets: %s", run_targets)
if options.run :
# given [run/...] definition..
run_targets = list(_parse_run_targets(options, config, options.run))
log.debug("Running %d given [run/%s] targets: %s", len(run_targets), options.run, run_targets)
# run
if run_targets :
log.debug("Running %d given targets...", len(run_targets))
# run given ones
for name in run_targets :
try :
# get
target = options.targets[name]
except KeyError:
log.error("Unknown target given: %s", name)
log.info("Defined targets: %s", ' '.join(options.targets))
return 2
# run
log.info("%s", name)
target.run(options)
else :
# all targets
log.debug("Running all %d targets...", len(options.targets))
# targets
for name, target in options.targets.iteritems() :
log.info("%s", name)
# run
target.run(options)
# ok
return 0
def config_defaults () :
return dict(
# snapshots/ naming
snapshot_format = '%Y%m%d-%H%M%S',
# rsync options, in invoke.optargs format
rsync_options = {
'archive': True,
'hard-links': True,
'one-file-system': True,
'numeric-ids': True,
'delete': True,
},
# defined intervals
intervals = dict((i.name, i) for i in [
Interval('recent',
format = None,
keep = 4,
),
Interval('day',
format = '%Y-%m-%d',
keep = 7,
),
Interval('week',
format = '%Y-%W',
keep = 4,
),
Interval('month',
format = '%Y-%m',
keep = 4,
),
Interval('year',
format = '%Y',
keep = 1,
)
]),
)
def main (argv) :
global options
# option defaults
defaults = config_defaults()
# global options + args
options, args = parse_options(argv, defaults)
# args: filter targets
# XXX: fix name mangling
targets = [target.replace('-', '_') for target in args]
try :
# handle it
return run(options, targets)
except Exception, e:
log.error("Internal error:", exc_info=e)
return 3
# ok
return 0
if __name__ == '__main__' :
import sys
sys.exit(main(sys.argv))