import collections
import logging; log = logging.getLogger('qmsk.dmx.control')
import serial
"""
Low-level DMX channel output.
"""
class DMXError (Exception) :
def __init__ (self, **kwargs) :
self.kwargs = kwargs
def __str__ (self) :
return self.__doc__.strip().format(**self.kwargs)
class DMXCommandError (DMXError) :
"""
Command {cmd!r} failed: {out!r}
"""
class DMXUnknownCommandError (DMXError) :
"""
Unknown command: {cmd!r}
"""
class DMX (object) :
"""
Arudino-based DMX controller using src/hello-dmx.c over the serial port.
"""
SERIAL = '/dev/arduino'
SERIAL_BAUD = 9600
@classmethod
def open (cls, path, baud=SERIAL_BAUD) :
return cls(serial.Serial(path, baud))
def __init__ (self, io) :
self.io = io
# XXX: sync initial line
self.sync()
self.sync()
def sync (self) :
"""
XXX: startup sync
"""
self.io.write(b'\r')
self.io.flush()
self.io.read(1)
def _arg (self, arg) :
"""
Format given value as a protocol argument.
"""
if isinstance(arg, str) :
value, = arg
value = ord(value)
elif isinstance(arg, int) :
value = arg
else :
raise ValueError(arg)
if 0 <= value <= 255 :
return str(value).encode('ascii')
else :
raise ValueError(value)
def __iter__ (self) :
"""
Read command responses back.
"""
while True:
yield self.io.readline()
def __call__ (self, cmd, *args, **opts) :
"""
<chr> [<int> [...]]
"""
# XXX:
poll = opts.pop('poll', True)
out = cmd + b' ' + b' '.join(self._arg(arg) for arg in args)
log.info("%s", out)
self.io.write(out)
self.io.write(b'\r')
self.io.flush()
if poll:
for ret in self:
break
if b'!' in ret :
raise DMXCommandError(cmd=out, out=ret)
elif b'?' in ret :
raise DMXUnknownCommandError(cmd=cmd)
def clear (self, **opts) :
"""
Set dmx = [ ]
i.e. start transmitting zero-length DMX packets.
For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state.
"""
self(b'c', **opts)
def zero (self, **opts) :
"""
Set dmx = [0, ...]
Uses the maximum DMX packet length available.
"""
self(b'z', **opts)
def out (self, *values, **opts) :
"""
Set dmx = (value, ...)
"""
self(b'o', *values, **opts)
def set (self, start, *values, **opts) :
"""
Set dmx[start:] = value
"""
self(b's', start, *values, **opts)
def fill (self, start, end, *values, **opts) :
"""
Set dmx[start:end] to repetitions of (value, ...)
"""
self(b'f', start, end, *values, **opts)
def range (self, start, stop, step, value, **opts) :
"""
Set dmx[start:end:step] = value
"""
self(b'r', start, stop, step, value, **opts)
def __setitem__ (self, index, value) :
"""
Magic indexing.
"""
if isinstance(value, collections.Sequence) :
values = tuple(value)
else :
values = (value, )
if isinstance(index, slice) :
if index.start and index.stop and index.step :
# XXX: single
self.range(index.start, index.stop, index.step, value)
elif index.start and index.stop :
self.fill(index.start, index.stop, *values)
elif index.start :
self.set(index.start, *values)
else :
raise IndexError("invalid slice: %s" % (index, ))
else :
# simple set
self.set(index, *values)