|
1 #!/usr/bin/env python |
|
2 |
|
3 import serial |
|
4 import time |
|
5 |
|
6 import logging; log = logging.getLogger('dmx') |
|
7 |
|
8 class DMXError (Exception) : |
|
9 def __init__ (self, **kwargs) : |
|
10 self.kwargs = kwargs |
|
11 |
|
12 def __str__ (self) : |
|
13 return self.__doc__.strip().format(**self.kwargs) |
|
14 |
|
15 class DMXCommandError (DMXError) : |
|
16 """ |
|
17 Command {cmd!r} failed: {out!r} |
|
18 """ |
|
19 |
|
20 class DMXUnknownCommandError (DMXError) : |
|
21 """ |
|
22 Unknown command: {cmd!r} |
|
23 """ |
|
24 |
|
25 class DMX (object) : |
|
26 SERIAL = '/dev/arduino' |
|
27 SERIAL_BAUD = 9600 |
|
28 SERIAL_TIMEOUT = 1.0 |
|
29 |
|
30 @classmethod |
|
31 def open (cls, path, baud=SERIAL_BAUD, timeout=SERIAL_TIMEOUT) : |
|
32 return cls(serial.Serial(path, baud, timeout=timeout)) |
|
33 |
|
34 def __init__ (self, io) : |
|
35 self.io = io |
|
36 |
|
37 # XXX: bug |
|
38 self.io.write('\r') |
|
39 self.io.flush() |
|
40 self.io.read(1) |
|
41 |
|
42 def __call__ (self, cmd, *args) : |
|
43 out = cmd + ' ' + ' '.join(str(arg) for arg in args) + '\r' |
|
44 |
|
45 log.info("%s", out) |
|
46 |
|
47 self.io.write(out) |
|
48 self.io.flush() |
|
49 |
|
50 ret = self.io.read(len(out)) |
|
51 |
|
52 if '!' in ret : |
|
53 raise DMXCommandError(cmd=out, out=ret) |
|
54 |
|
55 elif '?' in ret : |
|
56 raise DMXUnknownCommandError(cmd=cmd) |
|
57 |
|
58 def clear (self) : |
|
59 """ |
|
60 Set dmx = [ ] |
|
61 |
|
62 i.e. start transmitting zero-length DMX packets. |
|
63 For most lights, this seems to be equivalent to losing the DMX signal, and they retain their old state. |
|
64 """ |
|
65 |
|
66 self('c') |
|
67 |
|
68 def zero (self) : |
|
69 """ |
|
70 Set dmx = [0, ...] |
|
71 |
|
72 Uses the maximum DMX packet length available. |
|
73 """ |
|
74 |
|
75 self('z') |
|
76 |
|
77 def out (self, *values) : |
|
78 """ |
|
79 Set dmx = (value, ...) |
|
80 """ |
|
81 |
|
82 self('o', *values) |
|
83 |
|
84 def set (self, start, *values) : |
|
85 """ |
|
86 Set dmx[start:] = value |
|
87 """ |
|
88 |
|
89 self('s', start, *values) |
|
90 |
|
91 def fill (self, start, end, *values) : |
|
92 """ |
|
93 Set dmx[start:end] to repetitions of (value, ...) |
|
94 """ |
|
95 |
|
96 self('f', start, end, *values) |
|
97 |
|
98 def range (self, start, stop, step, value) : |
|
99 """ |
|
100 Set dmx[start:end:step] = value |
|
101 """ |
|
102 |
|
103 self('r', start, stop, step, value) |
|
104 |
|
105 import argparse |
|
106 |
|
107 def main (argv) : |
|
108 parser = argparse.ArgumentParser() |
|
109 parser.add_argument('--serial', default=DMX.SERIAL, |
|
110 help="Path to /dev/tty*") |
|
111 parser.add_argument('--zero', action='store_true', |
|
112 help="Zero output before setting") |
|
113 parser.add_argument('--start', type=int, |
|
114 help="Set from start offset") |
|
115 parser.add_argument('--stop', type=int, |
|
116 help="Set to end offset") |
|
117 parser.add_argument('--step', type=int, |
|
118 help="Step") |
|
119 |
|
120 parser.add_argument('channels', nargs='*', type=int, |
|
121 help="Output channel values") |
|
122 |
|
123 options = parser.parse_args(argv[1:]) |
|
124 |
|
125 logging.basicConfig(level=logging.DEBUG) |
|
126 |
|
127 dmx = DMX.open(options.serial) |
|
128 |
|
129 if options.zero : |
|
130 dmx.zero() |
|
131 |
|
132 |
|
133 if options.start and options.stop and options.step : |
|
134 dmx.range(options.start, options.stop, options.step, *options.channels) |
|
135 |
|
136 elif options.start and options.stop : |
|
137 dmx.fill(options.start, options.stop, *options.channels) |
|
138 |
|
139 elif options.start : |
|
140 dmx.set(options.start, *options.channels) |
|
141 |
|
142 elif options.channels : |
|
143 dmx.out(*options.channels) |
|
144 |
|
145 if __name__ == '__main__': |
|
146 import sys |
|
147 sys.exit(main(sys.argv)) |