# HG changeset patch # User Tero Marttila # Date 1397241920 -10800 # Node ID 9031dafa39d67f92528c67a228b98dc129de23b9 # Parent 3c25f32c92fa78050819847a8bdf152bf9d8cfdf dmx: split into qmsk.dmx diff -r 3c25f32c92fa -r 9031dafa39d6 .hgignore --- a/.hgignore Fri Apr 11 20:46:33 2014 +0300 +++ b/.hgignore Fri Apr 11 21:45:20 2014 +0300 @@ -8,4 +8,7 @@ *.hex *.s.* +*.pyc + docs/ +opt/ diff -r 3c25f32c92fa -r 9031dafa39d6 bin/dmx.py --- a/bin/dmx.py Fri Apr 11 20:46:33 2014 +0300 +++ b/bin/dmx.py Fri Apr 11 21:45:20 2014 +0300 @@ -1,155 +1,14 @@ #!/usr/bin/env python -import collections -import serial -import time +import qmsk.dmx import logging; log = logging.getLogger('dmx') -class DMXError (Exception) : - def __init__ (self, **kwargs) : - self.kwargs = kwargs - - def __str__ (self) : - return self.__doc__.strip().format(**self.kwargs) - -class DMXCommandError (DMXError) : - """ - Command {cmd!r} failed: {out!r} - """ - -class DMXUnknownCommandError (DMXError) : - """ - Unknown command: {cmd!r} - """ - -class DMX (object) : - """ - Arudino-based DMX controller using src/hello-dmx.c over the serial port. - """ - - SERIAL = '/dev/arduino' - SERIAL_BAUD = 9600 - SERIAL_TIMEOUT = 1.0 - - @classmethod - def open (cls, path, baud=SERIAL_BAUD, timeout=SERIAL_TIMEOUT) : - return cls(serial.Serial(path, baud, timeout=timeout)) - - def __init__ (self, io) : - self.io = io - - # XXX: bug - self.io.write('\r') - self.io.flush() - self.io.read(1) - - def _arg (self, arg) : - if isinstance(arg, str) : - value, = arg - value = ord(value) - elif isinstance(arg, int) : - value = arg - else : - raise ValueError(arg) - - if 0 <= value <= 255 : - return str(value) - else : - raise ValueError(value) - - def __call__ (self, cmd, *args) : - out = cmd + ' ' + ' '.join(self._arg(arg) for arg in args) + '\r' - - log.info("%s", out) - - self.io.write(out) - self.io.flush() - - ret = self.io.read(len(out)) - - if '!' in ret : - raise DMXCommandError(cmd=out, out=ret) - - elif '?' in ret : - raise DMXUnknownCommandError(cmd=cmd) - - def clear (self) : - """ - Set dmx = [ ] - - i.e. start transmitting zero-length DMX packets. - For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state. - """ - - self('c') - - def zero (self) : - """ - Set dmx = [0, ...] - - Uses the maximum DMX packet length available. - """ - - self('z') - - def out (self, *values) : - """ - Set dmx = (value, ...) - """ - - self('o', *values) - - def set (self, start, *values) : - """ - Set dmx[start:] = value - """ - - self('s', start, *values) - - def fill (self, start, end, *values) : - """ - Set dmx[start:end] to repetitions of (value, ...) - """ - - self('f', start, end, *values) - - def range (self, start, stop, step, value) : - """ - Set dmx[start:end:step] = value - """ - - self('r', start, stop, step, value) - - def __setitem__ (self, index, value) : - if isinstance(value, collections.Sequence) : - values = tuple(value) - else : - values = (value, ) - - if isinstance(index, slice) : - if index.start and index.stop and index.step : - # XXX: single - self.range(index.start, index.stop, index.step, value) - - elif index.start and index.stop : - self.fill(index.start, index.stop, *values) - - elif index.start : - self.set(index.start, *values) - - else : - raise IndexError("invalid slice: %s" % (index, )) - - else : - # simple set - self.set(index, *values) - import argparse def main (argv) : parser = argparse.ArgumentParser() - parser.add_argument('--serial', default=DMX.SERIAL, + parser.add_argument('--serial', default=qmsk.dmx.DMX.SERIAL, help="Path to /dev/tty*") parser.add_argument('--zero', action='store_true', help="Zero output before setting") @@ -167,7 +26,7 @@ logging.basicConfig(level=logging.DEBUG) - dmx = DMX.open(options.serial) + dmx = qmsk.dmx.DMX.open(options.serial) if options.zero : dmx.zero() diff -r 3c25f32c92fa -r 9031dafa39d6 qmsk/dmx/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/dmx/__init__.py Fri Apr 11 21:45:20 2014 +0300 @@ -0,0 +1,4 @@ +from qmsk.dmx.control import ( + DMXError, + DMX, +) diff -r 3c25f32c92fa -r 9031dafa39d6 qmsk/dmx/control.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/dmx/control.py Fri Apr 11 21:45:20 2014 +0300 @@ -0,0 +1,143 @@ +import collections +import logging; log = logging.getLogger('qmsk.dmx.control') +import serial + +class DMXError (Exception) : + def __init__ (self, **kwargs) : + self.kwargs = kwargs + + def __str__ (self) : + return self.__doc__.strip().format(**self.kwargs) + +class DMXCommandError (DMXError) : + """ + Command {cmd!r} failed: {out!r} + """ + +class DMXUnknownCommandError (DMXError) : + """ + Unknown command: {cmd!r} + """ + +class DMX (object) : + """ + Arudino-based DMX controller using src/hello-dmx.c over the serial port. + """ + + SERIAL = '/dev/arduino' + SERIAL_BAUD = 9600 + SERIAL_TIMEOUT = 1.0 + + @classmethod + def open (cls, path, baud=SERIAL_BAUD, timeout=SERIAL_TIMEOUT) : + return cls(serial.Serial(path, baud, timeout=timeout)) + + def __init__ (self, io) : + self.io = io + + # XXX: bug + self.io.write('\r') + self.io.flush() + self.io.read(1) + + def _arg (self, arg) : + if isinstance(arg, str) : + value, = arg + value = ord(value) + elif isinstance(arg, int) : + value = arg + else : + raise ValueError(arg) + + if 0 <= value <= 255 : + return str(value) + else : + raise ValueError(value) + + def __call__ (self, cmd, *args) : + out = cmd + ' ' + ' '.join(self._arg(arg) for arg in args) + '\r' + + log.info("%s", out) + + self.io.write(out) + self.io.flush() + + ret = self.io.read(len(out)) + + if '!' in ret : + raise DMXCommandError(cmd=out, out=ret) + + elif '?' in ret : + raise DMXUnknownCommandError(cmd=cmd) + + def clear (self) : + """ + Set dmx = [ ] + + i.e. start transmitting zero-length DMX packets. + For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state. + """ + + self('c') + + def zero (self) : + """ + Set dmx = [0, ...] + + Uses the maximum DMX packet length available. + """ + + self('z') + + def out (self, *values) : + """ + Set dmx = (value, ...) + """ + + self('o', *values) + + def set (self, start, *values) : + """ + Set dmx[start:] = value + """ + + self('s', start, *values) + + def fill (self, start, end, *values) : + """ + Set dmx[start:end] to repetitions of (value, ...) + """ + + self('f', start, end, *values) + + def range (self, start, stop, step, value) : + """ + Set dmx[start:end:step] = value + """ + + self('r', start, stop, step, value) + + def __setitem__ (self, index, value) : + if isinstance(value, collections.Sequence) : + values = tuple(value) + else : + values = (value, ) + + if isinstance(index, slice) : + if index.start and index.stop and index.step : + # XXX: single + self.range(index.start, index.stop, index.step, value) + + elif index.start and index.stop : + self.fill(index.start, index.stop, *values) + + elif index.start : + self.set(index.start, *values) + + else : + raise IndexError("invalid slice: %s" % (index, )) + + else : + # simple set + self.set(index, *values) +