bin/dmx.py
author Tero Marttila <terom@paivola.fi>
Fri, 11 Apr 2014 20:46:33 +0300
changeset 73 3c25f32c92fa
parent 71 24f00b561a4a
child 74 9031dafa39d6
permissions -rwxr-xr-x
dmx: implement __setitem__
#!/usr/bin/env python

import collections
import serial
import time

import logging; log = logging.getLogger('dmx')

class DMXError (Exception) :
    def __init__ (self, **kwargs) :
        self.kwargs = kwargs

    def __str__ (self) :
        return self.__doc__.strip().format(**self.kwargs)

class DMXCommandError (DMXError) :
    """
        Command {cmd!r} failed: {out!r}
    """

class DMXUnknownCommandError (DMXError) :
    """
        Unknown command: {cmd!r}
    """

class DMX (object) :
    """
        Arudino-based DMX controller using src/hello-dmx.c over the serial port.
    """

    SERIAL = '/dev/arduino'
    SERIAL_BAUD = 9600
    SERIAL_TIMEOUT = 1.0

    @classmethod
    def open (cls, path, baud=SERIAL_BAUD, timeout=SERIAL_TIMEOUT) :
        return cls(serial.Serial(path, baud, timeout=timeout))

    def __init__ (self, io) :
        self.io = io

        # XXX: bug
        self.io.write('\r')
        self.io.flush()
        self.io.read(1)

    def _arg (self, arg) :
        if isinstance(arg, str) :
            value, = arg
            value = ord(value)
        elif isinstance(arg, int) :
            value = arg
        else :
            raise ValueError(arg)

        if 0 <= value <= 255 :
            return str(value)
        else :
            raise ValueError(value)

    def __call__ (self, cmd, *args) :
        out = cmd + ' ' + ' '.join(self._arg(arg) for arg in args) + '\r'
        
        log.info("%s", out)
        
        self.io.write(out)
        self.io.flush()

        ret = self.io.read(len(out))

        if '!' in ret :
            raise DMXCommandError(cmd=out, out=ret)

        elif '?' in ret :
            raise DMXUnknownCommandError(cmd=cmd)

    def clear (self) :
        """
            Set dmx = [ ]

            i.e. start transmitting zero-length DMX packets.
            For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state.
        """

        self('c')

    def zero (self) :
        """
            Set dmx = [0, ...]

            Uses the maximum DMX packet length available.
        """

        self('z')

    def out (self, *values) :
        """
            Set dmx = (value, ...)
        """

        self('o', *values)

    def set (self, start, *values) :
        """
            Set dmx[start:] = value
        """

        self('s', start, *values)

    def fill (self, start, end, *values) :
        """
            Set dmx[start:end] to repetitions of (value, ...)
        """

        self('f', start, end, *values)

    def range (self, start, stop, step, value) :
        """
            Set dmx[start:end:step] = value
        """

        self('r', start, stop, step, value)

    def __setitem__ (self, index, value) :
        if isinstance(value, collections.Sequence) :
            values = tuple(value)
        else :
            values = (value, )

        if isinstance(index, slice) :
            if index.start and index.stop and index.step :
                # XXX: single
                self.range(index.start, index.stop, index.step, value)

            elif index.start and index.stop :
                self.fill(index.start, index.stop, *values)

            elif index.start :
                self.set(index.start, *values)

            else :
                raise IndexError("invalid slice: %s" % (index, ))
        
        else :
            # simple set
            self.set(index, *values)

import argparse

def main (argv) :
    parser = argparse.ArgumentParser()
    parser.add_argument('--serial', default=DMX.SERIAL,
            help="Path to /dev/tty*")
    parser.add_argument('--zero', action='store_true',
            help="Zero output before setting")
    parser.add_argument('--start', type=int,
            help="Set from start offset")
    parser.add_argument('--stop', type=int,
            help="Set to end offset")
    parser.add_argument('--step', type=int,
            help="Step")

    parser.add_argument('channels', nargs='*', type=int,
            help="Output channel values")

    options = parser.parse_args(argv[1:])

    logging.basicConfig(level=logging.DEBUG)

    dmx = DMX.open(options.serial)

    if options.zero :
        dmx.zero()


    if options.start and options.stop and options.step :
        dmx.range(options.start, options.stop, options.step, *options.channels)

    elif options.start and options.stop :
        dmx.fill(options.start, options.stop, *options.channels)

    elif options.start :
        dmx.set(options.start, *options.channels)

    elif options.channels :
        dmx.out(*options.channels)

if __name__ == '__main__':
    import sys
    sys.exit(main(sys.argv))