terom@74: import collections terom@74: import logging; log = logging.getLogger('qmsk.dmx.control') terom@74: import serial terom@74: terom@83: """ terom@83: Low-level DMX channel output. terom@83: """ terom@83: terom@74: class DMXError (Exception) : terom@74: def __init__ (self, **kwargs) : terom@74: self.kwargs = kwargs terom@74: terom@74: def __str__ (self) : terom@74: return self.__doc__.strip().format(**self.kwargs) terom@74: terom@74: class DMXCommandError (DMXError) : terom@74: """ terom@74: Command {cmd!r} failed: {out!r} terom@74: """ terom@74: terom@74: class DMXUnknownCommandError (DMXError) : terom@74: """ terom@74: Unknown command: {cmd!r} terom@74: """ terom@74: terom@74: class DMX (object) : terom@74: """ terom@74: Arudino-based DMX controller using src/hello-dmx.c over the serial port. terom@74: """ terom@74: terom@74: SERIAL = '/dev/arduino' terom@74: SERIAL_BAUD = 9600 terom@74: SERIAL_TIMEOUT = 1.0 terom@74: terom@74: @classmethod terom@74: def open (cls, path, baud=SERIAL_BAUD, timeout=SERIAL_TIMEOUT) : terom@74: return cls(serial.Serial(path, baud, timeout=timeout)) terom@74: terom@74: def __init__ (self, io) : terom@74: self.io = io terom@74: terom@74: # XXX: bug terom@74: self.io.write('\r') terom@74: self.io.flush() terom@74: self.io.read(1) terom@74: terom@74: def _arg (self, arg) : terom@74: if isinstance(arg, str) : terom@74: value, = arg terom@74: value = ord(value) terom@74: elif isinstance(arg, int) : terom@74: value = arg terom@74: else : terom@74: raise ValueError(arg) terom@74: terom@74: if 0 <= value <= 255 : terom@74: return str(value) terom@74: else : terom@74: raise ValueError(value) terom@74: terom@74: def __call__ (self, cmd, *args) : terom@74: out = cmd + ' ' + ' '.join(self._arg(arg) for arg in args) + '\r' terom@74: terom@74: log.info("%s", out) terom@74: terom@74: self.io.write(out) terom@74: self.io.flush() terom@74: terom@74: ret = self.io.read(len(out)) terom@74: terom@74: if '!' in ret : terom@74: raise DMXCommandError(cmd=out, out=ret) terom@74: terom@74: elif '?' in ret : terom@74: raise DMXUnknownCommandError(cmd=cmd) terom@74: terom@74: def clear (self) : terom@74: """ terom@74: Set dmx = [ ] terom@74: terom@74: i.e. start transmitting zero-length DMX packets. terom@74: For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state. terom@74: """ terom@74: terom@74: self('c') terom@74: terom@74: def zero (self) : terom@74: """ terom@74: Set dmx = [0, ...] terom@74: terom@74: Uses the maximum DMX packet length available. terom@74: """ terom@74: terom@74: self('z') terom@74: terom@74: def out (self, *values) : terom@74: """ terom@74: Set dmx = (value, ...) terom@74: """ terom@74: terom@74: self('o', *values) terom@74: terom@74: def set (self, start, *values) : terom@74: """ terom@74: Set dmx[start:] = value terom@74: """ terom@74: terom@74: self('s', start, *values) terom@74: terom@74: def fill (self, start, end, *values) : terom@74: """ terom@74: Set dmx[start:end] to repetitions of (value, ...) terom@74: """ terom@74: terom@74: self('f', start, end, *values) terom@74: terom@74: def range (self, start, stop, step, value) : terom@74: """ terom@74: Set dmx[start:end:step] = value terom@74: """ terom@74: terom@74: self('r', start, stop, step, value) terom@74: terom@74: def __setitem__ (self, index, value) : terom@74: if isinstance(value, collections.Sequence) : terom@74: values = tuple(value) terom@74: else : terom@74: values = (value, ) terom@74: terom@74: if isinstance(index, slice) : terom@74: if index.start and index.stop and index.step : terom@74: # XXX: single terom@74: self.range(index.start, index.stop, index.step, value) terom@74: terom@74: elif index.start and index.stop : terom@74: self.fill(index.start, index.stop, *values) terom@74: terom@74: elif index.start : terom@74: self.set(index.start, *values) terom@74: terom@74: else : terom@74: raise IndexError("invalid slice: %s" % (index, )) terom@74: terom@74: else : terom@74: # simple set terom@74: self.set(index, *values) terom@74: