#!/usr/bin/env python
import collections
import serial
import time
import logging; log = logging.getLogger('dmx')
class DMXError (Exception) :
def __init__ (self, **kwargs) :
self.kwargs = kwargs
def __str__ (self) :
return self.__doc__.strip().format(**self.kwargs)
class DMXCommandError (DMXError) :
"""
Command {cmd!r} failed: {out!r}
"""
class DMXUnknownCommandError (DMXError) :
"""
Unknown command: {cmd!r}
"""
class DMX (object) :
"""
Arudino-based DMX controller using src/hello-dmx.c over the serial port.
"""
SERIAL = '/dev/arduino'
SERIAL_BAUD = 9600
SERIAL_TIMEOUT = 1.0
@classmethod
def open (cls, path, baud=SERIAL_BAUD, timeout=SERIAL_TIMEOUT) :
return cls(serial.Serial(path, baud, timeout=timeout))
def __init__ (self, io) :
self.io = io
# XXX: bug
self.io.write('\r')
self.io.flush()
self.io.read(1)
def _arg (self, arg) :
if isinstance(arg, str) :
value, = arg
value = ord(value)
elif isinstance(arg, int) :
value = arg
else :
raise ValueError(arg)
if 0 <= value <= 255 :
return str(value)
else :
raise ValueError(value)
def __call__ (self, cmd, *args) :
out = cmd + ' ' + ' '.join(self._arg(arg) for arg in args) + '\r'
log.info("%s", out)
self.io.write(out)
self.io.flush()
ret = self.io.read(len(out))
if '!' in ret :
raise DMXCommandError(cmd=out, out=ret)
elif '?' in ret :
raise DMXUnknownCommandError(cmd=cmd)
def clear (self) :
"""
Set dmx = [ ]
i.e. start transmitting zero-length DMX packets.
For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state.
"""
self('c')
def zero (self) :
"""
Set dmx = [0, ...]
Uses the maximum DMX packet length available.
"""
self('z')
def out (self, *values) :
"""
Set dmx = (value, ...)
"""
self('o', *values)
def set (self, start, *values) :
"""
Set dmx[start:] = value
"""
self('s', start, *values)
def fill (self, start, end, *values) :
"""
Set dmx[start:end] to repetitions of (value, ...)
"""
self('f', start, end, *values)
def range (self, start, stop, step, value) :
"""
Set dmx[start:end:step] = value
"""
self('r', start, stop, step, value)
def __setitem__ (self, index, value) :
if isinstance(value, collections.Sequence) :
values = tuple(value)
else :
values = (value, )
if isinstance(index, slice) :
if index.start and index.stop and index.step :
# XXX: single
self.range(index.start, index.stop, index.step, value)
elif index.start and index.stop :
self.fill(index.start, index.stop, *values)
elif index.start :
self.set(index.start, *values)
else :
raise IndexError("invalid slice: %s" % (index, ))
else :
# simple set
self.set(index, *values)
import argparse
def main (argv) :
parser = argparse.ArgumentParser()
parser.add_argument('--serial', default=DMX.SERIAL,
help="Path to /dev/tty*")
parser.add_argument('--zero', action='store_true',
help="Zero output before setting")
parser.add_argument('--start', type=int,
help="Set from start offset")
parser.add_argument('--stop', type=int,
help="Set to end offset")
parser.add_argument('--step', type=int,
help="Step")
parser.add_argument('channels', nargs='*', type=int,
help="Output channel values")
options = parser.parse_args(argv[1:])
logging.basicConfig(level=logging.DEBUG)
dmx = DMX.open(options.serial)
if options.zero :
dmx.zero()
if options.start and options.stop and options.step :
dmx.range(options.start, options.stop, options.step, *options.channels)
elif options.start and options.stop :
dmx.fill(options.start, options.stop, *options.channels)
elif options.start :
dmx.set(options.start, *options.channels)
elif options.channels :
dmx.out(*options.channels)
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv))