terom@74: import collections terom@74: import logging; log = logging.getLogger('qmsk.dmx.control') terom@74: import serial terom@74: terom@83: """ terom@83: Low-level DMX channel output. terom@83: """ terom@83: terom@74: class DMXError (Exception) : terom@74: def __init__ (self, **kwargs) : terom@74: self.kwargs = kwargs terom@74: terom@74: def __str__ (self) : terom@74: return self.__doc__.strip().format(**self.kwargs) terom@74: terom@74: class DMXCommandError (DMXError) : terom@74: """ terom@74: Command {cmd!r} failed: {out!r} terom@74: """ terom@74: terom@74: class DMXUnknownCommandError (DMXError) : terom@74: """ terom@74: Unknown command: {cmd!r} terom@74: """ terom@74: terom@74: class DMX (object) : terom@74: """ terom@74: Arudino-based DMX controller using src/hello-dmx.c over the serial port. terom@74: """ terom@74: terom@74: SERIAL = '/dev/arduino' terom@74: SERIAL_BAUD = 9600 terom@74: terom@74: @classmethod terom@90: def open (cls, path, baud=SERIAL_BAUD) : terom@90: return cls(serial.Serial(path, baud)) terom@74: terom@74: def __init__ (self, io) : terom@74: self.io = io terom@90: terom@90: # XXX: sync initial line terom@90: self.sync() terom@90: self.sync() terom@74: terom@90: def sync (self) : terom@90: """ terom@90: XXX: startup sync terom@90: """ terom@90: terom@90: self.io.write(b'\r') terom@74: self.io.flush() terom@74: self.io.read(1) terom@74: terom@74: def _arg (self, arg) : terom@87: """ terom@87: Format given value as a protocol argument. terom@87: """ terom@87: terom@74: if isinstance(arg, str) : terom@74: value, = arg terom@74: value = ord(value) terom@74: elif isinstance(arg, int) : terom@74: value = arg terom@74: else : terom@74: raise ValueError(arg) terom@74: terom@74: if 0 <= value <= 255 : terom@90: return str(value).encode('ascii') terom@74: else : terom@74: raise ValueError(value) terom@74: terom@87: def __iter__ (self) : terom@87: """ terom@87: Read command responses back. terom@87: """ terom@87: terom@87: while True: terom@87: yield self.io.readline() terom@87: terom@87: def __call__ (self, cmd, *args, **opts) : terom@87: """ terom@87: terom@87: [ [...]] terom@87: """ terom@87: terom@87: # XXX: terom@87: poll = opts.pop('poll', True) terom@90: terom@90: out = cmd + b' ' + b' '.join(self._arg(arg) for arg in args) terom@74: terom@74: log.info("%s", out) terom@74: terom@74: self.io.write(out) terom@90: self.io.write(b'\r') terom@74: self.io.flush() terom@74: terom@87: if poll: terom@87: for ret in self: terom@87: break terom@74: terom@90: if b'!' in ret : terom@87: raise DMXCommandError(cmd=out, out=ret) terom@74: terom@90: elif b'?' in ret : terom@87: raise DMXUnknownCommandError(cmd=cmd) terom@74: terom@87: def clear (self, **opts) : terom@74: """ terom@74: Set dmx = [ ] terom@74: terom@74: i.e. start transmitting zero-length DMX packets. terom@74: For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state. terom@74: """ terom@74: terom@90: self(b'c', **opts) terom@74: terom@87: def zero (self, **opts) : terom@74: """ terom@74: Set dmx = [0, ...] terom@74: terom@74: Uses the maximum DMX packet length available. terom@74: """ terom@74: terom@90: self(b'z', **opts) terom@74: terom@87: def out (self, *values, **opts) : terom@74: """ terom@74: Set dmx = (value, ...) terom@74: """ terom@74: terom@90: self(b'o', *values, **opts) terom@74: terom@87: def set (self, start, *values, **opts) : terom@74: """ terom@74: Set dmx[start:] = value terom@74: """ terom@74: terom@90: self(b's', start, *values, **opts) terom@74: terom@87: def fill (self, start, end, *values, **opts) : terom@74: """ terom@74: Set dmx[start:end] to repetitions of (value, ...) terom@74: """ terom@74: terom@90: self(b'f', start, end, *values, **opts) terom@74: terom@87: def range (self, start, stop, step, value, **opts) : terom@74: """ terom@74: Set dmx[start:end:step] = value terom@74: """ terom@74: terom@90: self(b'r', start, stop, step, value, **opts) terom@74: terom@74: def __setitem__ (self, index, value) : terom@87: """ terom@87: Magic indexing. terom@87: """ terom@87: terom@74: if isinstance(value, collections.Sequence) : terom@74: values = tuple(value) terom@74: else : terom@74: values = (value, ) terom@74: terom@74: if isinstance(index, slice) : terom@74: if index.start and index.stop and index.step : terom@74: # XXX: single terom@74: self.range(index.start, index.stop, index.step, value) terom@74: terom@74: elif index.start and index.stop : terom@74: self.fill(index.start, index.stop, *values) terom@74: terom@74: elif index.start : terom@74: self.set(index.start, *values) terom@74: terom@74: else : terom@74: raise IndexError("invalid slice: %s" % (index, )) terom@74: terom@74: else : terom@74: # simple set terom@74: self.set(index, *values) terom@74: