"""
Filesystem path handling
"""
import os, os.path, errno, stat
import codecs, shutil
import itertools
from utils import lazy_load
class NodeError (Exception) :
"""
General exception class for errors associated with some specific node
"""
def __init__ (self, node, message) :
super(NodeError, self).__init__(message)
self.node = node
def __str__ (self) :
return "%s: %s" % (self.node, self.message)
def __unicode__ (self) :
return u"%s: %s" % (self.node, self.message)
def __repr__ (self) :
return "NodeError(%r, %r)" % (self.node, self.message)
class NodeErrno (NodeError) :
"""
OSError/errno errors for nodes
"""
def __init__ (self, node, errno) :
"""
Converts the given errno into an error message and uses that as the exception message
"""
super(NodeErrno, self).__init__(node, os.strerror(errno))
class Node (object) :
"""
A filesystem object is basically just complicated representation of a path.
On the plus side, it has a parent node and can handle unicode/binary paths.
"""
# the binary name
fsname = None
# the unicode name
name = None
def decode_fsname (self, fsname) :
"""
Decode the given raw byte string representing a filesystem name into an user-readable unicode name.
XXX: currently just hardcoded as utf-8
>>> Node(None, 'foo').decode_fsname('\xa5\xa6')
u'\\ufffd\\ufffd'
"""
return fsname.decode('utf-8', 'replace')
def encode_name (self, name) :
"""
Returns a suitable fsname for the given unicode name or strict ASCII str
XXX: currently just hardcoded as utf-8
>>> Node(None, 'foo').encode_name(u'ab')
'ab'
"""
# this should fail for non-ASCII str
return name.encode('utf-8')
def __init__ (self, parent, fsname=None, name=None, config=None) :
"""
Initialize the node with a parent and both name/fsname.
If not given, fsname is encoded from name, or name decoded from fsname, using encode/decode_name.
If parent is given, but both fsname and name are None, then this node will be cloned from the parent.
>>> Node(Root('/'), 'foo')
Node('/', 'foo')
>>> Node(Node(Root('/'), 'bar'))
Node('/', 'bar')
>>> Node(None, fsname='foo\xa5').name
u'foo\\ufffd'
>>> Node(None, name=u'foo').fsname
'foo'
"""
# fsname must not be an unicode string
assert not fsname or isinstance(fsname, str)
if parent and not fsname and not name :
# no name given -> we're the same as parent
self.parent, self.config, self.fsname, self.name = parent.parent, parent.config, parent.fsname, parent.name
else :
# store
self.parent = parent
# config, either as given, or copy from parent
if config :
self.config = config
elif parent :
self.config = parent.config
else :
# XXX: no config
self.config = None
# fsname
if fsname :
self.fsname = fsname
else :
self.fsname = self.encode_name(name)
# name
if name :
self.name = unicode(name)
else :
self.name = self.decode_fsname(fsname)
def subnode (self, name) :
"""
Returns a Node object representing the given name behind this node.
The name should either be a plain ASCII string or unicode object.
>>> Node(Root('/'), 'foo').subnode('bar')
Node('/foo', 'bar')
"""
return Node(self, name=name)
def nodepath (self) :
"""
Returns the path of nodes from this node to the root node, inclusive
>>> list(Node(Root('/'), 'foo').subnode('bar').nodepath())
[Root('/'), Node('/', 'foo'), Node('/foo', 'bar')]
"""
# recursive generator
for node in self.parent.nodepath() :
yield node
yield self
@lazy_load
def path (self) :
"""
Return the machine-readable root-path for this node
>>> Node(Root('/'), 'foo').subnode('bar').path
'/foo/bar'
>>> Node(Root('/'), name=u'foo').path
'/foo'
>>> Node(Root('/'), fsname='\\x01\\x02').path
'/\\x01\\x02'
"""
# build using parent path and our fsname
# XXX: rewrite using nodepath?
return os.path.join(self.parent.path, self.fsname)
@lazy_load
def unicodepath (self) :
"""
Return the human-readable root-path for this node
>>> Node(Root('/'), 'foo').subnode('bar').unicodepath
u'/foo/bar'
>>> Node(Root('/'), name=u'foo').unicodepath
u'/foo'
>>> Node(Root('/'), fsname='\\x01\\x02').unicodepath
u'/??'
"""
# build using parent unicodepath and our name
# XXX: rewrte using nodepath?
return os.path.join(self.parent.path, self.name)
def path_segments (self, unicode=True) :
"""
Return a series of single-level names describing the path from the root to this node.
If `unicode` is given, then the returned items will be the unicode names, otherwise, the binary names.
>>> list(Node(Root('/'), 'foo').subnode('bar').path_segments())
[u'/', u'foo', u'bar']
>>> list(Node(Root('/'), 'foo').subnode('bar').path_segments(unicode=False))
['/', 'foo', 'bar']
"""
# iter
for segment in self.parent.path_segments(unicode=unicode) :
yield segment
yield self.name if unicode else self.fsname
@lazy_load
def _stat (self) :
"""
Cached OS-level stats.
Returns None on ENOENT (node doesn't exist).
"""
try :
# syscall
return os.stat(self.path)
except OSError, e :
# trap ENOENT for soft
if soft and e.errno == errno.ENOENT :
return None
else :
raise
def stat (self, soft=False) :
"""
Returns the os.stat struct for this node.
If `soft` is given, returns None if this node doesn't exist.
These stats are not cached.
>>> Root('/').stat() is not None
True
>>> Root('/nonexistant').stat(soft=True) is None
True
"""
if self._stat :
# got it
return self._stat
elif soft :
# doesn't exist
return None
else :
# not found
raise NodeErrno(self, errno.ENOENT)
def exists (self) :
"""
Tests if this node exists on the physical filesystem
>>> Node(Root('.'), '.').exists()
True
>>> Node(Root('/'), 'nonexistant').exists()
False
"""
return self._stat is not None
def is_dir (self) :
"""
Tests if this node represents a directory on the physical filesystem.
Returns False for non-existant files.
>>> Node(Root('/'), '.').is_dir()
True
>>> Root('/').subnode('dev').subnode('null').is_dir()
False
"""
return stat.S_ISDIR(self._stat.st_mode) if self._stat else False
def is_file (self) :
"""
Tests if this node represents a normal file on the physical filesystem
Returns False for non-existant files.
>>> Node(Root('/'), '.').is_file()
False
>>> Root('/').subnode('dev').subnode('null').is_file()
False
"""
return stat.S_ISREG(self._stat.st_mode) if self._stat else False
def test (self) :
"""
Tests that this node exists. Raises an error it not, otherwise, returns the node itself
"""
if not self.exists() :
raise Exception("Filesystem node does not exist: %s" % self)
return self
def path_to (self, node) :
"""
Returns a relative path from this node to the given node
XXX: doctests
"""
# get real paths for both
from_path = list(self.nodepath())
to_path = list(node.nodepath())
pivot = None
# reduce common prefix
while from_path and to_path and from_path[0] == to_path[0] :
from_path.pop(0)
pivot = to_path.pop(0)
# full path
path = itertools.chain(reversed(from_path), [pivot] if pivot else (), to_path)
# build path
return Path(*path)
def path_from (self, node) :
"""
Returns a relative path to this node from the given node.
This is the same as path_to, but just reversed.
"""
return node.path_to(self)
def __str__ (self) :
return self.path
def __unicode__ (self) :
return self.unicodepath
def __repr__ (self) :
"""
Returns a str representing this dir
"""
return "Node(%r, %r)" % (self.parent.path, self.fsname)
def __eq__ (self, other) :
"""
Compare for equality
"""
return isinstance(other, Node) and self.name == other.name and self.parent == other.parent
def __cmp__ (self, other) :
"""
Arbitrary comparisons between Nodes
>>> cmp(Node(None, 'foo'), Node(None, 'foo'))
0
>>> cmp(Node(None, 'aaa'), Node(None, 'bbb'))
-1
>>> cmp(Node(None, 'bbb'), Node(None, 'aaa'))
1
>>> cmp(Node(Node(None, 'a'), 'aa'), Node(Node(None, 'a'), 'aa'))
0
>>> cmp(Node(Node(None, 'a'), 'aa'), Node(Node(None, 'a'), 'ab'))
-1
>>> cmp(Node(Node(None, 'a'), 'ab'), Node(Node(None, 'a'), 'aa'))
1
>>> cmp(Node(Node(None, 'a'), 'zz'), Node(Node(None, 'b'), 'aa'))
-1
>>> cmp(Node(Node(None, 'a'), 'aa'), Node(Node(None, 'b'), 'zz'))
-1
>>> cmp(Node(Node(None, 'z'), 'aa'), Node(Node(None, 'a'), 'zz'))
1
"""
if other is None :
# arbitrary...
return 1
else :
return cmp((self.parent, self.name), (other.parent if self.parent else None, other.name))
class Path (object) :
"""
A Path is a sequence of Nodes that form a path through a Node tree rooted at some Root.
Each node must either be the parent or the child of the following node.
The first and last nodes may be Files, but all other objects must be Directories.
"""
def __init__ (self, *nodes) :
"""
Initialize with the given node path.
The node path must not be empty.
"""
self.nodes = nodes
def subpath (self, *nodes) :
"""
Returns a new path with the given node(s) appended
"""
return Path(*itertools.chain(self.nodes, nodes))
def path_segments (self, unicode=True) :
"""
Yields a series of physical path segments for this path.
File -> Directory :
file.parent == dir -> nothing
Directory -> Directory :
dir_1.parent == dir_2 -> '..'
dir_1 == dir_2.parent -> dir_2.name
Directory -> File :
file.parent == dir -> file.name
>>> root = Root('root'); Path(root, root.subfile('foo'))
Path('foo')
>>> root = Root('root'); Path(root, root.subdir('foo'), root.subdir('foo').subfile('bar'))
Path('foo', 'bar')
>>> root = Root('root'); Path(root.subfile('foo'), root)
Path('.')
>>> root = Root('root'); Path(root.subfile('foo'), root, root.subfile('bar'))
Path('bar')
>>> root = Root('root'); Path(root.subfile('foo'))
Path('foo')
"""
# XXX: this logic should be implemented as methods in Node
prev = prev_last = None
# output directory components
for node in self.nodes :
if not prev :
# ignore the first item for now
pass
elif isinstance(prev, File) :
# going from a file to its dir doesn't require anything
assert isinstance(node, Directory) and prev.parent == node
elif isinstance(node, File) :
# final target, must come from a directory
assert node is self.nodes[-1] and (not prev or (isinstance(prev, Directory) and node.parent == prev))
elif prev.parent == node :
# going from a dir into the dir above it
yield '..'
elif node.parent == prev :
# going from a dir into a dir underneath it
yield node.name if unicode else node.fsname
else :
raise Exception("invalid path: %r" % (self.nodes, ))
# chained together
prev_last = prev
prev = node
# output final file/lone dir component
if isinstance(node, File) :
# the last/only node is the final file target and must *always* be output
yield node.name if unicode else node.fsname
elif isinstance(node, Directory) and (prev_last is None or isinstance(prev_last, File)) :
assert prev_last.parent == node
# going from a file into it's own directory is a direct reference
yield '.'
def __iter__ (self) :
"""
Iterate over the nodes
"""
return iter(self.nodes)
def __unicode__ (self) :
"""
Returns the unicode human-readable path
"""
return os.path.join(*self.path_segments(unicode=True))
def __str__ (self) :
"""
Returns the binary machine-readable path
"""
return os.path.join(*self.path_segments(unicode=False))
def __repr__ (self) :
return "Path(%s)" % ', '.join(repr(segment) for segment in self.path_segments(unicode=False))
class File (Node) :
"""
A file. Simple, eh?
"""
@property
def basename (self) :
"""
Returns the "base" part of this file's name, i.e. the filename without the extension
"""
basename, _ = os.path.splitext(self.name)
return basename
@property
def fileext (self) :
"""
Returns the file extension part of the file's name, without any leading dot
"""
_, fileext = os.path.splitext(self.name)
# strip leading .
return fileext[1:]
def matchext (self, ext_list) :
"""
Tests if this file's extension is part of the recognized list of extensions
"""
return (self.fileext.lower() in ext_list)
def test (self) :
"""
Tests that this file exists as a file. Raises an error it not, otherwise, returns itself
"""
if not self.is_file() :
raise Exception("File does not exist: %s" % self)
return self
def open (self, mode='r', encoding=None, errors=None, bufsize=None) :
"""
Wrapper for open/codecs.open.
Raises an error if read_only mode is set and mode contains any of 'wa+'
"""
if self.config.read_only and any((c in mode) for c in 'wa+') :
raise Exception("Unable to open file for %s due to read_only mode: %s" % (mode, self))
if encoding :
return codecs.open(self.path, mode, encoding, errors, bufsize)
else :
return open(self.path, mode, *(arg for arg in (bufsize, ) if arg is not None))
def open_write (self, *args, **kwargs) :
"""
Open for write using open('w').
"""
return self.open('w', *args, **kwargs)
def copy_from (self, file) :
"""
Replace this file with a copy of the given file with default permissions.
Raises an error if read_only mode is set.
XXX: accept mode
"""
if self.config.read_only :
raise Exception("Not copying file as read_only mode is set: %s -> %s" % (file, self))
# perform the copy
shutil.copyfile(file.path, self.path)
def newer_than (self, file) :
"""
Returns True if both files exist, and this file is newer than the given file.
"""
if self._stat and file._stat :
return self._stat.st_mtime > file._stat.st_mtime
else :
return None
def older_than (self, file) :
"""
Returns True if both files exist, and this file is older than the given file.
"""
# mirror
return file.newer_than(self)
class Directory (Node) :
"""
A directory is a node that contains other nodes.
"""
# a list of (test_func, node_type) tuples for use by children() to build subnodes with
NODE_TYPES = None
def subdir (self, name, create=False) :
"""
Returns a Directory object representing the name underneath this dir.
If the create option is given, the directory will be created if it does not exist. Note that this will
raise an error if read_only mode is set
"""
subdir = Directory(self, name=name)
if create and not subdir.is_dir() :
# create it!
subdir.mkdir()
return subdir
def subfile (self, name) :
"""
Returns a File object representing the name underneath this dir
"""
return File(self, name=name)
def test (self) :
"""
Tests that this dir exists as a dir. Raises an error it not, otherwise, returns itself
"""
if not self.is_dir() :
raise Exception("Directory does not exist: %s" % self)
return self
def mkdir (self) :
"""
Create this directory with default permissions.
This will fail if read_only mode is set
XXX: mode argument
"""
if self.config.read_only :
# forbidden
raise Exception("Unable to create dir due to read_only mode: %s" % self)
# do it
os.mkdir(self.path)
def listdir (self, skip_dotfiles=True) :
"""
Yield a series of raw fsnames for nodes in this dir
"""
# expressed
return (fsname for fsname in os.listdir(self.path) if not (skip_dotfiles and fsname.startswith('.')))
def subnodes (self, skip_dotfiles=True, sort=True) :
"""
Yield a series of Nodes contained in this dir.
If skip_dotfiles is given, nodes that begin with a . are omitted.
If `sort` is given, the returned nodes will be in sorted order.
"""
iter = (Node(self, fsname) for fsname in self.listdir(skip_dotfiles))
if sort :
return sorted(iter)
else :
return iter
__iter__ = subnodes
@property
def root_path (self) :
"""
Build and return a relative path to the root of this dir tree
XXX: move to node
"""
# build using parent root_path
return os.path.join('..', self.parent.root_path)
def children (self) :
"""
Yield a series of Node subclasses representing the items in this dir.
This uses self.NODE_TYPES to figure out what kind of sub-node object to build. This should be a list of
(test_func, node_type)
tuples, of which the first is a function that takes a Node as it's sole argument, and returns a boolean.
For the first test_func which returns True, a Node-subclass object is constructed using node_type.from_node.
XXX: never used
"""
for node in self :
# figure out what type to use
for test_func, node_type in self.NODE_TYPES :
if test_func(node) :
# matches, build
yield node_type.from_node(node)
else :
# unknown file type!
raise Exception("unrecongized type of file: %s" % node);
# assign default Directory.NODE_TYPES
Directory.NODE_TYPES = [
(Node.is_dir, Directory),
(Node.is_file, File),
]
class Root (Directory) :
"""
A special Directory that overrides the Node methods to anchor the recursion/etc at some 'real' filesystem path.
"""
# XXX: config needs a default
def __init__ (self, fspath, config=None) :
"""
Construct the directory tree root at the given 'real' path, which must be a raw str
"""
# abuse Node's concept of a "name" a bit
super(Root, self).__init__(None, fspath)
# store our config
self.config = config
def nodepath (self) :
"""
Just return ourself
"""
return [self]
@property
def path (self) :
"""
Returns the raw path
"""
return self.fsname
@property
def unicodepath (self) :
"""
Returns the raw decoded path
"""
return self.name
@property
def root_path (self) :
"""
Returns an empty string representing this dir
"""
return ''
def path_segments (self, unicode=True) :
"""
No path segments other than our own
"""
yield self.name if unicode else self.fsname
def __repr__ (self) :
"""
Override Node.__repr__ to not use self.parent.path
"""
return "Root(%r)" % self.fsname
# testing
if __name__ == '__main__' :
import doctest
doctest.testmod()