# HG changeset patch # User Tero Marttila # Date 1289052148 -7200 # Node ID 00907acd732a2abdf5180276b3d5e2b295338626 # Parent eb0545ec03e7885db2d083fae53130b0c65f6d9c# Parent 8574aeff9b367350318182956d0bee7cd0972343 Merge with api restructuring diff -r eb0545ec03e7 -r 00907acd732a etc/fixbot-logwatch.py --- a/etc/fixbot-logwatch.py Sat Nov 06 16:01:42 2010 +0200 +++ b/etc/fixbot-logwatch.py Sat Nov 06 16:02:28 2010 +0200 @@ -18,10 +18,10 @@ filters.su_nobody_killer, filters.all, )), - UnixDatagramSocket("test", os.path.join(logwatch_dir, "test.sock"), ( - filters.sudo, - filters.ssh, - filters.cron_killer, - filters.all, - )), + # UnixDatagramSocket("test", os.path.join(logwatch_dir, "test.sock"), ( + # filters.sudo, + # filters.ssh, + # filters.cron_killer, + # filters.all, + # )), ) diff -r eb0545ec03e7 -r 00907acd732a fixbot/api.py --- a/fixbot/api.py Sat Nov 06 16:01:42 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,213 +0,0 @@ -from twisted.application import internet, service -from twisted.internet import protocol, reactor -from twisted.python import log, usage -from datetime import datetime -import sys - -from fixbot import buffer, config - -class ModuleInfo (object) : - """ - Some info about a module - """ - - # module's name - name = None - - def __str__ (self) : - return "Module %s:" % (self.name) - - def __repr__ (self) : - return "" % (self.name, ) - -class Event (object) : - # the ModuleInfo object - module = None - - # the event type as a string - type = None - - # event message as a string (up to 64k, although that won't fit onto IRC..) - msg = None - - # timestamp as a datetime.datetime - when = None - - def __init__ (self, module, type, msg) : - self.module = module - self.type = type - self.msg = msg - - self.when = datetime.now() - - def __str__ (self) : - return "[%s] %s" % (self.type, self.msg) - - def __repr__ (self) : - return "%s @ %s" % (self.type, self.when) - -CLIENT_COMMANDS = [ - "module_init", - "module_event", -] - -SERVER_COMMANDS = [ - "module_ok", -] - -class ServerProtocol (buffer.StreamProtocol, protocol.Protocol) : - RECV_COMMANDS = CLIENT_COMMANDS - SEND_COMMANDS = SERVER_COMMANDS - - VALID_STATES = [ - "wait_init", - "wait_event" - ] - - # proto state - state = None - - # module info - module = None - - def _assert (self, condition, msg) : - if not condition : - self.transport.loseConnection() - log.err("assert failed in APIProtocol for %s: %s" % (self.module, msg)) - - def connectionMade (self) : - log.msg("Client connected") - - def connectionLost (self, reason) : - log.msg("Connection lost: %s" % reason) - - if self.module : - self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage()) - - def on_module_init (self, i) : - self._assert(not self.module, "module_init with non-None self.module") - - peer_secret = i.readVarLen('B') - - self._assert(peer_secret == self.factory.secret, "Mismatching API secrets!") - - m = ModuleInfo() - - m.name = i.readVarLen('B') - m.addr = self.transport.getPeer() - - log.msg("Got mod_init for %r" % m) - - self.factory.nexus.registerModule(m, self) - - self.module = m - - o = self.startCommand('module_ok') - - self.send(o) - - def on_module_event (self, i) : - self._assert(self.module, "module_event with None self.module!") - - event_type = i.readVarLen('B') - event_msg = i.readVarLen('H') - - e = Event(self.module, event_type, event_msg) - - self.factory.nexus.handleEvent(e) - - def logPrefix (self) : - if self.module : - return str(self.module) - else : - return super(ServerProtocol, self).logPrefix() - -class ClientProtocol (buffer.StreamProtocol, protocol.Protocol) : - RECV_COMMANDS = SERVER_COMMANDS - SEND_COMMANDS = CLIENT_COMMANDS - - def connectionMade (self) : - log.msg("Connected to API server, sending module init message") - - o = self.startCommand('module_init') - o.writeVarLen('B', self.factory.secret) - o.writeVarLen('B', self.factory.name) - - self.send(o) - - def sendEvent (self, event) : - o = self.startCommand('module_event') - o.writeVarLen('B', event.type) - o.writeVarLen('H', event.msg[:2**16]) - - self.send(o) - - def on_module_ok (self, i) : - log.msg("Registration OK") - - self.factory.connected(self) - - def logPrefix (self) : - return "module %s client" % (self.factory.name) - -class Module (ModuleInfo, protocol.ClientFactory) : - protocol = ClientProtocol - - def __init__ (self, config) : - self.connection = None - self.secret = config['api-secret'] - - def connected (self, connection) : - log.msg("Connected!") - self.connection = connection - - self.handleConnect() - - def disconnect (self) : - self.connection.transport.loseConnection() - - def sendEvent (self, type, msg) : - self.connection.sendEvent(self.buildEvent(type, msg)) - - def buildEvent (self, type, msg) : - return Event(self, type, msg) - - def handleConnect (self) : - """ - Do something - """ - - pass - -class ServerFactory (protocol.ServerFactory) : - protocol = ServerProtocol - - def __init__ (self, nexus, secret) : - self.nexus = nexus - self.secret = secret - -class ClientOptions (config.ConfigOptions) : - optParameters = [ - ( "api-server", "s", "127.0.0.1", "address of API server to connect to" ), - ( "api-port", "P", 34888, "port of API server to connect to", int ), - ( "api-secret", None, None, "secret key for API connections" ), - ] - - optFlags = [ - - ] - -def makeService (module_class, config) : - s = service.MultiService() - - # build factory - factory = module_class(config) - - # the API client - log.msg("Connecting to API server on [%s:%d]" % (config['api-server'], config['api-port'])) - api_client = internet.TCPClient(config['api-server'], config['api-port'], factory) - - api_client.setServiceParent(s) - - return s - diff -r eb0545ec03e7 -r 00907acd732a fixbot/api/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fixbot/api/__init__.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,37 @@ +from twisted.internet import protocol + +from fixbot import config +from fixbot.api import amp + +class ServerOptions (config.ConfigOptions) : + """ + Options for the API component of the Nexus + """ + + optParameters = [ + ( "api-listen", "l", "127.0.0.1", "address for API server to listen on" ), + ( "api-port", "P", 34888, "port for API server to listen on", int ), + ( "api-secret", None, None, "secret key for API connections" ), + ] + +class ClientOptions (config.ConfigOptions) : + """ + Options for the API component of a Module + """ + + optParameters = [ + ( "api-server", "s", "127.0.0.1", "address of API server to connect to" ), + ( "api-port", "P", 34888, "port of API server to connect to", int ), + ] + +class ServerFactory (protocol.ServerFactory) : + def __init__ (self, nexus, config) : + self.nexus = nexus + + # protocol to use + self.protocol = amp.ServerProtocol + + # XXX: only used for legacy + self.secret = config.get('api-secret') + + diff -r eb0545ec03e7 -r 00907acd732a fixbot/api/amp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fixbot/api/amp.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,155 @@ +from twisted.protocols import amp +from twisted.python import log + +from fixbot.module import ModuleInfo, Event + +class CmdModuleRegister (amp.Command) : + """ + Register module + """ + + arguments = [ + ('name', amp.String()), + ] + +class CmdModuleEvent (amp.Command) : + """ + Broadcast event + """ + + arguments = [ + ('type', amp.String()), + ('msg', amp.String()), + ] + +class CmdModuleAbort (amp.Command) : + """ + Module has failed and will now disconnect + """ + + arguments = [ + ('msg', amp.String()) # string describing the error occuring + ] + + requiresAnswer = False + +class ServerProtocol (amp.AMP) : + """ + Nexus-side command handler + """ + + # the registered ModuleInfo + module = None + + def connectionMade (self) : + log.msg("Module connecting from: %s" % (self.transport.getPeer())) + + + def connectionLost (self, reason) : + log.err(reason, "Module lost") + + if self.module : + # drop it + self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage()) + + + @CmdModuleRegister.responder + def on_ModuleRegister (self, name) : + # construct the ModuleInfo + mi = ModuleInfo() + mi.name = name + + log.msg("Module registered: %s" % (mi)) + + # register + self.factory.nexus.registerModule(mi, self) + self.module = mi + + # ok + return {} + + @CmdModuleEvent.responder + def on_ModuleEvent (self, type, msg) : + # as Event + e = Event(self.module, type, msg) + + # publish + self.factory.nexus.handleEvent(e) + + # ok + return {} + + @CmdModuleAbort.responder + def on_ModuleAbort (self, msg) : + # unhook + module = self.module + self.module = None + + # report + self.factory.nexus.unregisterModule(self.module, msg) + + # XXX: stop accepting commands etc? + +class ClientProtocol (amp.AMP) : + """ + Module-side command sender/handler + """ + + + def connectionMade (self) : + """ + Connected to nexus, send ModuleRegister + """ + + # register + self.sendModuleRegister(self.factory.name).addCallback(self._ModuleRegisterOK) + + def connectionLost (self, reason) : + """ + Disconnected from nexus, for whatever reason... + """ + + log.err(reason, "API connection lost") + + # XXX: was this expected? Reconnect? + + def sendModuleRegister (self, name) : + """ + Register with given module name + """ + + return self.callRemote(CmdModuleRegister, name=name) + + + def _ModuleRegisterOK (self, ret) : + """ + Registered with nexus, commence operation + """ + + self.factory._onRegistered(self) + + + def sendEvent (self, event) : + """ + Broadcast event to nexus + """ + + self.callRemote(CmdModuleEvent, type=event.type, msg=event.msg) + + def sendModuleAbort (self, msg) : + """ + Send CmdModuleAbort - no response is expected + """ + + self.callRemote(CmdModuleAbort, msg=msg) + + + def abort (self, msg) : + """ + Send abort message and drop connection + """ + + # disconnect. This should leave the transport to flush buffers, and then call connectionLost + # should also stop us from sending any more commands + self.transport.loseConnection() + diff -r eb0545ec03e7 -r 00907acd732a fixbot/api/buffer.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fixbot/api/buffer.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,417 @@ +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO +import struct + +# prefixed to all struct format strings +STRUCT_PREFIX = '!' + +def hex (bytes) : + return ' '.join(['%#04x' % ord(b) for b in bytes]) + +class NotEnoughDataError (Exception) : + pass + +class IStreamBase (object) : + # prefixed to all struct format strings + STRUCT_PREFIX = '!' + +class IReadStream (IStreamBase) : + """ + IReadStream simply provides various interfaces for reading bytes from a + stream in various ways + """ + + def read (self, size=None) : + """ + Read and return up to the given amount of bytes, or all bytes + available if no size given. + """ + + abstract + + def readStruct (self, fmt) : + """ + Reads the correct amount of data and then unpacks it according to + the given format. Note that this always returns a tuple, for single + items, use readItem + """ + + fmt = self.STRUCT_PREFIX + fmt + + fmt_size = struct.calcsize(fmt) + data = self.read(fmt_size) + + return struct.unpack(fmt, data) + + def readItem (self, fmt) : + """ + Reads the correct amount of data, unpacks it according to the + given format, and then returns the first item. + """ + + return self.readStruct(fmt)[0] + + def readVarLen (self, len_type) : + """ + Return the data part of a structure. + len_type indicates what type length has (struct format code). + + In the case of being zero, returns an empty string. + """ + + size = self.readItem(len_type) + + if size : + return self.read(size) + else : + return "" + + def readEnum (self, enum) : + """ + Returns the item from the given list of enum values that corresponds + to a single-byte value read from the stream + """ + + return enum[self.readItem('B')] + +class ISeekableStream (IStreamBase) : + """ + Extends IStreamBase to provide the ability to seek backwards into the + stream (which still does not know it's length, and thence cannot seek + forwards). + """ + + _position = None + + def tell (self) : + """ + Return the current offset into the seekable stream + """ + + abstract + + def seek (self, pos) : + """ + Seek to the given position in the stream. + """ + + abstract + + def mark (self) : + """ + Set a mark that can be later rolled back to with .reset() + """ + + self._position = self.tell() + + def unmark (self) : + """ + Remove the mark without affecting the current position + """ + + self._position = None + + def reset (self) : + """ + Rolls the buffer back to the position set earlier with mark() + """ + + if self._position is not None : + self.seek(self._position) + self._position = None + + else : + raise Exception("reset() without mark()") + +class ISeekableReadStream (ISeekableStream, IReadStream) : + def peek (self, len=None) : + """ + Return a string representing what buf.read(len) would return, but do + not affect future read operations + """ + + pos = self.tell() + + data = self.read(len) + + self.seek(pos) + + return data + + +class INonBlockingReadStream (IReadStream) : + """ + Otherwise identical to IReadStream, but read will either return size + bytes, or raise a NotEnoughDataError + """ + + pass + +class IWriteStream (IStreamBase) : + """ + IWriteStream provides various ways to write data to a byte stream + """ + + def write (self, data) : + """ + Write the given bytes to the stream + """ + + abstract + + def writeStruct (self, fmt, *args) : + """ + Pack the given arguments with the given struct format, and write it + to the stream. + """ + + self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) + + writeItem = writeStruct + + def writeVarLen (self, len_type, data) : + """ + Write a field into the buffer. Len_type is the + struct format code for the length field. + """ + + self.writeStruct(len_type, len(data)) + self.write(data) + + def writeEnum (self, enum, name) : + """ + Write the single-byte value correspnding to the given name's + position in the given enum + """ + + self.writeStruct('B', enum.index(name)) + +class IBufferBase (ISeekableStream) : + """ + A buffer simply provides a way to read and write data to/from a byte + sequence stored in memory. + """ + + def tell (self) : + return self._buf.tell() + + def seek (self, offset) : + return self._buf.seek(offset) + + def getvalue (self) : + """ + Returns the value of the buffer, i.e. a string with the contents of + the buffer from position zero to the end. + """ + + return self._buf.getvalue() + +class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : + """ + A read-only buffer. Can be initialized with a given value and then later + replaced in various ways, but cannot be modified. + """ + + def __init__ (self, data="") : + """ + Initialize the buffer with the given data + """ + + self._buf = StringIO(data) + + def read (self, size=None) : + """ + Return the given number of bytes, or raise a NotEnoughDataError + """ + + if size == 0 : + raise ValueError("can't read zero bytes") + + if size : + ret = self._buf.read(size) + else : + ret = self._buf.read() + + if size and len(ret) < size : + raise NotEnoughDataError() + + return ret + + def append (self, data) : + """ + Modify the buffer such that it contains the old data from this + buffer, and the given data at the end. The read position in the buffer + is kept the same. + """ + + pos = self.tell() + + self._buf = StringIO(self._buf.getvalue() + data) + + self.seek(pos) + + def chop (self) : + """ + Discard the data in the buffer before the current read position. + Also removes any marks. + """ + + self._position = None + + self._buf = StringIO(self.read()) + + def processWith (self, func) : + """ + Call the given function with this buffer as an argument after + calling mark(). If the function + a) returns None, the buffer is .chop()'d, and we repeat the + process. + b) raises a NotEnoughDataError, whereupon the buffer is rolled + back to where it was before calling the function with + chop(). + c) raises a StopIteration, whereupon we chop the buffer and + return. + d) returns something (i.e. ret is not None), whereupon we + return that (and leave the current buffer position untouched). + """ + ret = None + + try : + while ret is None : + self.mark() # mark the position of the packet we are processing + ret = func(self) + + if ret is None : + # discard the processed packet and proceed to the next one + self.chop() + + except NotEnoughDataError, e : + self.reset() # reset position back to the start of the packet + return e + + except StopIteration, e: + self.chop() + return e # processed ok, but we don't want to process any further packets + + else : + return ret + +class WriteBuffer (IWriteStream, IBufferBase) : + """ + A write-only buffer. Data can be written to this buffer in various + ways, but cannot be read from it except as a whole. + """ + + def __init__ (self) : + """ + Initialize the buffer + """ + + self._buf = StringIO() + + def write (self, data) : + """ + Write the given data to the current position in the stream, + overwriting any previous data in that position, and extending + the buffer if needed + """ + + return self._buf.write(data) + +def readStringStream (stream, varlen_type) : + """ + Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) + """ + + while True : + item = stream.readVarLen(varlen_type) + + if item : + yield item + else : + return + +def writeStringStream (stream, varlen_type, strings) : + """ + Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token + """ + + for item in strings : + stream.writeVarLen(varlen_type, item) + + stream.writeItem(varlen_type, 0) + +class StreamProtocol (object) : + """ + A mixin to let you use Buffer with twisted.internet.protocol.Protocol + """ + + # a list of receivable command names + RECV_COMMANDS = None + + # a list of sendable command names + SEND_COMMANDS = None + + def __init__ (self) : + """ + Initialize the cross-dataReceived buffer + """ + + self.in_buffer = ReadBuffer() + + def send (self, buf) : + """ + Write the contents of the given WriteBuffer to the transport + """ + + self.transport.write(buf.getvalue()) + + def dataReceived (self, data) : + """ + Buffer the incoming data and then try and process it + """ + + self.in_buffer.append(data) + + ret = self.in_buffer.processWith(self.processPacket) + + def processPacket (self, buf) : + """ + Call processCommand with the buffer, handling the return value (either None or a deferred) + """ + + ret = self.processCommand(buf) + + if ret : + ret.addCallback(self.send) + + def processCommand (self, buf) : + """ + Process a command from the given buffer. May return a callback + """ + + return self.readMethod(buf, self.RECV_COMMANDS, buf) + + # conveniance read/write + def startCommand (self, cmd) : + buf = WriteBuffer() + + buf.writeEnum(self.SEND_COMMANDS, cmd) + + return buf + + def readMethod (self, buf, methods, *args, **kwargs) : + """ + Reads a single-byte -enum value from the given buffer and + use it to find the corresponding method (as _, + prefix can be overriden with a keyword argument and defaults to + 'on'. If any extra arguments are given, they will be passed to the + method. + """ + + prefix = kwargs.pop("prefix", "on") + + return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) + diff -r eb0545ec03e7 -r 00907acd732a fixbot/api/legacy.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fixbot/api/legacy.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,111 @@ +from twisted.internet import protocol +from twisted.python import log + +from fixbot import buffer +from fixbot.module import ModuleInfo, Event + + +CLIENT_COMMANDS = [ + "module_init", + "module_event", +] + +SERVER_COMMANDS = [ + "module_ok", +] + +class ServerProtocol (buffer.StreamProtocol, protocol.Protocol) : + RECV_COMMANDS = CLIENT_COMMANDS + SEND_COMMANDS = SERVER_COMMANDS + + VALID_STATES = [ + "wait_init", + "wait_event" + ] + + # proto state + state = None + + # module info + module = None + + def _assert (self, condition, msg) : + if not condition : + self.transport.loseConnection() + log.err("assert failed in APIProtocol for %s: %s" % (self.module, msg)) + + def connectionMade (self) : + log.msg("Client connected") + + def connectionLost (self, reason) : + log.msg("Connection lost: %s" % reason) + + if self.module : + self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage()) + + def on_module_init (self, i) : + self._assert(not self.module, "module_init with non-None self.module") + + peer_secret = i.readVarLen('B') + + self._assert(peer_secret == self.factory.secret, "Mismatching API secrets!") + + m = ModuleInfo() + + m.name = i.readVarLen('B') + m.addr = self.transport.getPeer() + + log.msg("Got mod_init for %r" % m) + + self.factory.nexus.registerModule(m, self) + + self.module = m + + o = self.startCommand('module_ok') + + self.send(o) + + def on_module_event (self, i) : + self._assert(self.module, "module_event with None self.module!") + + event_type = i.readVarLen('B') + event_msg = i.readVarLen('H') + + e = Event(self.module, event_type, event_msg) + + self.factory.nexus.handleEvent(e) + + def logPrefix (self) : + if self.module : + return str(self.module) + else : + return super(ServerProtocol, self).logPrefix() + +class ClientProtocol (buffer.StreamProtocol, protocol.Protocol) : + RECV_COMMANDS = SERVER_COMMANDS + SEND_COMMANDS = CLIENT_COMMANDS + + def connectionMade (self) : + log.msg("Connected to API server, sending module init message") + + o = self.startCommand('module_init') + o.writeVarLen('B', self.factory.secret) + o.writeVarLen('B', self.factory.name) + + self.send(o) + + def sendEvent (self, event) : + o = self.startCommand('module_event') + o.writeVarLen('B', event.type) + o.writeVarLen('H', event.msg[:2**16]) + + self.send(o) + + def on_module_ok (self, i) : + log.msg("Registration OK") + + self.factory._onRegistered(self) + + def logPrefix (self) : + return "module %s client" % (self.factory.name) + diff -r eb0545ec03e7 -r 00907acd732a fixbot/buffer.py --- a/fixbot/buffer.py Sat Nov 06 16:01:42 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,417 +0,0 @@ -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO -import struct - -# prefixed to all struct format strings -STRUCT_PREFIX = '!' - -def hex (bytes) : - return ' '.join(['%#04x' % ord(b) for b in bytes]) - -class NotEnoughDataError (Exception) : - pass - -class IStreamBase (object) : - # prefixed to all struct format strings - STRUCT_PREFIX = '!' - -class IReadStream (IStreamBase) : - """ - IReadStream simply provides various interfaces for reading bytes from a - stream in various ways - """ - - def read (self, size=None) : - """ - Read and return up to the given amount of bytes, or all bytes - available if no size given. - """ - - abstract - - def readStruct (self, fmt) : - """ - Reads the correct amount of data and then unpacks it according to - the given format. Note that this always returns a tuple, for single - items, use readItem - """ - - fmt = self.STRUCT_PREFIX + fmt - - fmt_size = struct.calcsize(fmt) - data = self.read(fmt_size) - - return struct.unpack(fmt, data) - - def readItem (self, fmt) : - """ - Reads the correct amount of data, unpacks it according to the - given format, and then returns the first item. - """ - - return self.readStruct(fmt)[0] - - def readVarLen (self, len_type) : - """ - Return the data part of a structure. - len_type indicates what type length has (struct format code). - - In the case of being zero, returns an empty string. - """ - - size = self.readItem(len_type) - - if size : - return self.read(size) - else : - return "" - - def readEnum (self, enum) : - """ - Returns the item from the given list of enum values that corresponds - to a single-byte value read from the stream - """ - - return enum[self.readItem('B')] - -class ISeekableStream (IStreamBase) : - """ - Extends IStreamBase to provide the ability to seek backwards into the - stream (which still does not know it's length, and thence cannot seek - forwards). - """ - - _position = None - - def tell (self) : - """ - Return the current offset into the seekable stream - """ - - abstract - - def seek (self, pos) : - """ - Seek to the given position in the stream. - """ - - abstract - - def mark (self) : - """ - Set a mark that can be later rolled back to with .reset() - """ - - self._position = self.tell() - - def unmark (self) : - """ - Remove the mark without affecting the current position - """ - - self._position = None - - def reset (self) : - """ - Rolls the buffer back to the position set earlier with mark() - """ - - if self._position is not None : - self.seek(self._position) - self._position = None - - else : - raise Exception("reset() without mark()") - -class ISeekableReadStream (ISeekableStream, IReadStream) : - def peek (self, len=None) : - """ - Return a string representing what buf.read(len) would return, but do - not affect future read operations - """ - - pos = self.tell() - - data = self.read(len) - - self.seek(pos) - - return data - - -class INonBlockingReadStream (IReadStream) : - """ - Otherwise identical to IReadStream, but read will either return size - bytes, or raise a NotEnoughDataError - """ - - pass - -class IWriteStream (IStreamBase) : - """ - IWriteStream provides various ways to write data to a byte stream - """ - - def write (self, data) : - """ - Write the given bytes to the stream - """ - - abstract - - def writeStruct (self, fmt, *args) : - """ - Pack the given arguments with the given struct format, and write it - to the stream. - """ - - self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) - - writeItem = writeStruct - - def writeVarLen (self, len_type, data) : - """ - Write a field into the buffer. Len_type is the - struct format code for the length field. - """ - - self.writeStruct(len_type, len(data)) - self.write(data) - - def writeEnum (self, enum, name) : - """ - Write the single-byte value correspnding to the given name's - position in the given enum - """ - - self.writeStruct('B', enum.index(name)) - -class IBufferBase (ISeekableStream) : - """ - A buffer simply provides a way to read and write data to/from a byte - sequence stored in memory. - """ - - def tell (self) : - return self._buf.tell() - - def seek (self, offset) : - return self._buf.seek(offset) - - def getvalue (self) : - """ - Returns the value of the buffer, i.e. a string with the contents of - the buffer from position zero to the end. - """ - - return self._buf.getvalue() - -class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) : - """ - A read-only buffer. Can be initialized with a given value and then later - replaced in various ways, but cannot be modified. - """ - - def __init__ (self, data="") : - """ - Initialize the buffer with the given data - """ - - self._buf = StringIO(data) - - def read (self, size=None) : - """ - Return the given number of bytes, or raise a NotEnoughDataError - """ - - if size == 0 : - raise ValueError("can't read zero bytes") - - if size : - ret = self._buf.read(size) - else : - ret = self._buf.read() - - if size and len(ret) < size : - raise NotEnoughDataError() - - return ret - - def append (self, data) : - """ - Modify the buffer such that it contains the old data from this - buffer, and the given data at the end. The read position in the buffer - is kept the same. - """ - - pos = self.tell() - - self._buf = StringIO(self._buf.getvalue() + data) - - self.seek(pos) - - def chop (self) : - """ - Discard the data in the buffer before the current read position. - Also removes any marks. - """ - - self._position = None - - self._buf = StringIO(self.read()) - - def processWith (self, func) : - """ - Call the given function with this buffer as an argument after - calling mark(). If the function - a) returns None, the buffer is .chop()'d, and we repeat the - process. - b) raises a NotEnoughDataError, whereupon the buffer is rolled - back to where it was before calling the function with - chop(). - c) raises a StopIteration, whereupon we chop the buffer and - return. - d) returns something (i.e. ret is not None), whereupon we - return that (and leave the current buffer position untouched). - """ - ret = None - - try : - while ret is None : - self.mark() # mark the position of the packet we are processing - ret = func(self) - - if ret is None : - # discard the processed packet and proceed to the next one - self.chop() - - except NotEnoughDataError, e : - self.reset() # reset position back to the start of the packet - return e - - except StopIteration, e: - self.chop() - return e # processed ok, but we don't want to process any further packets - - else : - return ret - -class WriteBuffer (IWriteStream, IBufferBase) : - """ - A write-only buffer. Data can be written to this buffer in various - ways, but cannot be read from it except as a whole. - """ - - def __init__ (self) : - """ - Initialize the buffer - """ - - self._buf = StringIO() - - def write (self, data) : - """ - Write the given data to the current position in the stream, - overwriting any previous data in that position, and extending - the buffer if needed - """ - - return self._buf.write(data) - -def readStringStream (stream, varlen_type) : - """ - Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string) - """ - - while True : - item = stream.readVarLen(varlen_type) - - if item : - yield item - else : - return - -def writeStringStream (stream, varlen_type, strings) : - """ - Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token - """ - - for item in strings : - stream.writeVarLen(varlen_type, item) - - stream.writeItem(varlen_type, 0) - -class StreamProtocol (object) : - """ - A mixin to let you use Buffer with twisted.internet.protocol.Protocol - """ - - # a list of receivable command names - RECV_COMMANDS = None - - # a list of sendable command names - SEND_COMMANDS = None - - def __init__ (self) : - """ - Initialize the cross-dataReceived buffer - """ - - self.in_buffer = ReadBuffer() - - def send (self, buf) : - """ - Write the contents of the given WriteBuffer to the transport - """ - - self.transport.write(buf.getvalue()) - - def dataReceived (self, data) : - """ - Buffer the incoming data and then try and process it - """ - - self.in_buffer.append(data) - - ret = self.in_buffer.processWith(self.processPacket) - - def processPacket (self, buf) : - """ - Call processCommand with the buffer, handling the return value (either None or a deferred) - """ - - ret = self.processCommand(buf) - - if ret : - ret.addCallback(self.send) - - def processCommand (self, buf) : - """ - Process a command from the given buffer. May return a callback - """ - - return self.readMethod(buf, self.RECV_COMMANDS, buf) - - # conveniance read/write - def startCommand (self, cmd) : - buf = WriteBuffer() - - buf.writeEnum(self.SEND_COMMANDS, cmd) - - return buf - - def readMethod (self, buf, methods, *args, **kwargs) : - """ - Reads a single-byte -enum value from the given buffer and - use it to find the corresponding method (as _, - prefix can be overriden with a keyword argument and defaults to - 'on'. If any extra arguments are given, they will be passed to the - method. - """ - - prefix = kwargs.pop("prefix", "on") - - return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs) - diff -r eb0545ec03e7 -r 00907acd732a fixbot/fifo.py --- a/fixbot/fifo.py Sat Nov 06 16:01:42 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,90 +0,0 @@ -# read a stream from a fifo - -from twisted.internet import reactor, interfaces -from twisted.python import log -from zope.interface import implements - -import os, fcntl, errno - -class EOF (Exception) : pass - -BUF_SIZE = 2048 - -class Fifo (object) : - implements(interfaces.IReadDescriptor) - - def __init__ (self, path) : - self.path = path - self.fd = None - - self._open() - - def _open (self) : - self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) - - reactor.addReader(self) - - def close (self) : - if self.fd : - reactor.removeReader(self) - os.close(self.fd) - - self.fd = None - - def reopen (self) : - """ - Close and re-open the fifo. This is useful for handling EOF - """ - self.close() - self._open() - - def _read (self, length) : - - try : - data = os.read(self.fd, length) - - except OSError, e : - if e.errno == errno.EAGAIN : - return None - else : - raise - - if not data : - raise EOF() - - return data - - def fileno (self) : - return self.fd - - def doRead (self) : - while True : - try : - data = self._read(BUF_SIZE) - except EOF : - self.handleEOF() - return - - if data : - self.dataReceived(data) - else : - break - - def dataReceived (self, data) : - pass - - def handleEOF (self) : - pass - - def connectionLost (self, reason) : - self.close() - - def logPrefix (self) : - return "fifo(%s)" % (self.path, ) - - def __del__ (self) : - """ - !!! this is important - """ - self.close() - diff -r eb0545ec03e7 -r 00907acd732a fixbot/irc.py --- a/fixbot/irc.py Sat Nov 06 16:01:42 2010 +0200 +++ b/fixbot/irc.py Sat Nov 06 16:02:28 2010 +0200 @@ -3,8 +3,6 @@ from twisted.python import log import traceback -import buffer - class ReplyException (Exception) : def __init__ (self, reply) : self.reply = reply diff -r eb0545ec03e7 -r 00907acd732a fixbot/logwatch/__init__.py --- a/fixbot/logwatch/__init__.py Sat Nov 06 16:01:42 2010 +0200 +++ b/fixbot/logwatch/__init__.py Sat Nov 06 16:02:28 2010 +0200 @@ -3,17 +3,17 @@ reformatting them before sending out to IRC. """ -from fixbot import api +from fixbot import module -class LogWatchModule (api.Module) : +class LogWatchModule (module.Module) : name = "logs" - def __init__ (self, config) : + def __init__ (self, config, protocol) : """ Initialize with logwatch_sources from config """ - - super(LogWatchModule, self).__init__(config) + + super(LogWatchModule, self).__init__(config, protocol) self.sources = config['logwatch-sources'] @@ -28,6 +28,6 @@ def error (self, msg) : self.sendEvent("error", msg) -def makeService (config) : - return api.makeService(LogWatchModule, config) +def makeService (config, protocol) : + return module.makeService(LogWatchModule, config, protocol) diff -r eb0545ec03e7 -r 00907acd732a fixbot/logwatch/fifo.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fixbot/logwatch/fifo.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,90 @@ +# read a stream from a fifo + +from twisted.internet import reactor, interfaces +from twisted.python import log +from zope.interface import implements + +import os, fcntl, errno + +class EOF (Exception) : pass + +BUF_SIZE = 2048 + +class Fifo (object) : + implements(interfaces.IReadDescriptor) + + def __init__ (self, path) : + self.path = path + self.fd = None + + self._open() + + def _open (self) : + self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK) + + reactor.addReader(self) + + def close (self) : + if self.fd : + reactor.removeReader(self) + os.close(self.fd) + + self.fd = None + + def reopen (self) : + """ + Close and re-open the fifo. This is useful for handling EOF + """ + self.close() + self._open() + + def _read (self, length) : + + try : + data = os.read(self.fd, length) + + except OSError, e : + if e.errno == errno.EAGAIN : + return None + else : + raise + + if not data : + raise EOF() + + return data + + def fileno (self) : + return self.fd + + def doRead (self) : + while True : + try : + data = self._read(BUF_SIZE) + except EOF : + self.handleEOF() + return + + if data : + self.dataReceived(data) + else : + break + + def dataReceived (self, data) : + pass + + def handleEOF (self) : + pass + + def connectionLost (self, reason) : + self.close() + + def logPrefix (self) : + return "fifo(%s)" % (self.path, ) + + def __del__ (self) : + """ + !!! this is important + """ + self.close() + diff -r eb0545ec03e7 -r 00907acd732a fixbot/logwatch/sources.py --- a/fixbot/logwatch/sources.py Sat Nov 06 16:01:42 2010 +0200 +++ b/fixbot/logwatch/sources.py Sat Nov 06 16:02:28 2010 +0200 @@ -5,8 +5,7 @@ from twisted.internet import protocol, reactor from twisted.python import log -from fixbot import fifo -from fixbot.logwatch import message +from fixbot.logwatch import fifo, message class LogSource (object) : """ @@ -38,6 +37,13 @@ log.err(msg) self.module.error(msg) + def connectionLost (self, reason) : + """ + The transport we were connected to has dropped, possibly as a result of our handlers raising an error? + """ + + self.handleError("lost LogSource for %s: %s" % (self.name, reason.getErrorMessage())) + def handleData (self, data) : """ Feed binary data into the buffer, processing all lines via handleLine() @@ -110,6 +116,9 @@ else : raise ValueError(out) + def logPrefix (self) : + return "LogSource(%s)" % (self.name, ) + class File (LogSource, protocol.ProcessProtocol) : """ Stream lines from a regular file using /usr/bin/tail -f @@ -150,11 +159,7 @@ def handleEOF (self) : self.handleError("!!! EOF on fifo %s, re-opening" % self.name) - self.reopen() - - def connectionLost (self, reason) : - super(Fifo, self).connectionLost(reason) - self.handleError("lost fifo for %s: %s" % (self.name, reason.getErrorMessage())) + self.reopen() class UnixDatagramSocket (LogSource, protocol.DatagramProtocol) : """ @@ -181,6 +186,4 @@ # handle it as a line of data self.handleLine(data) - def logPrefix (self) : - return "LogSource(%s)" % (self.name, ) diff -r eb0545ec03e7 -r 00907acd732a fixbot/module.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fixbot/module.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,131 @@ +from twisted.internet import protocol +from twisted.application import service, internet +from twisted.python import log + +import datetime + +class ModuleInfo (object) : + """ + Nexus-side handle on a Module + """ + + # module's name + name = None + + def __str__ (self) : + return "%s" % (self.name) + + def __repr__ (self) : + return "" % (self.name, ) + + +class Event (object) : + """ + An Event, sent by a Module to the Nexus, to be distributed further + """ + + # the ModuleInfo object + module = None + + # the event type as a string + type = None + + # event message as a string (up to 64k, although that won't fit onto IRC..) + msg = None + + # timestamp as a datetime.datetime + when = None + + def __init__ (self, module, type, msg) : + self.module = module + self.type = type + self.msg = msg + + self.when = datetime.datetime.now() + + def __str__ (self) : + return "[%s] %s" % (self.type, self.msg) + + def __repr__ (self) : + return "%s @ %s" % (self.type, self.when) + + +class Module (ModuleInfo, protocol.ClientFactory) : + """ + Module core, handles the API connection state and processes all messages + """ + + # our API connection to the Nexus + connection = None + + def __init__ (self, config, protocol) : + """ + config - configuration for connecting to nexus + protocol - API client protocol to use for this factory + """ + + self.protocol = protocol + + self.connection = None + + # XXX: legacy: self.secret = config['api-secret'] + + + def _onRegistered (self, connection) : + """ + Connected to nexus and registered + """ + + log.msg("Connected and registered") + + self.connection = connection + + # XXX: abort on errors? + self.handleConnect() + + + # XXX: unused, bad interface + def disconnect (self) : + """ + Disconnect from Nexus + """ + + self.connection.transport.loseConnection() + + + def sendEvent (self, type, msg) : + """ + Send event to nexus + """ + + self.connection.sendEvent(Event(self, type, msg)) + + + def handleConnect (self) : + """ + Do something once we are connected to nexus and registered + """ + + pass + + def abort (self, err) : + """ + Abort this module, disconnecting with the given error + """ + + self.connection.abort(str(err)) + +def makeService (module_class, config, protocol) : + s = service.MultiService() + + # build factory + factory = module_class(config, protocol) + + # the API client + log.msg("Connecting to API server on [%s:%d]" % (config['api-server'], config['api-port'])) + api_client = internet.TCPClient(config['api-server'], config['api-port'], factory) + + api_client.setServiceParent(s) + + return s + diff -r eb0545ec03e7 -r 00907acd732a fixbot/nexus.py --- a/fixbot/nexus.py Sat Nov 06 16:01:42 2010 +0200 +++ b/fixbot/nexus.py Sat Nov 06 16:02:28 2010 +0200 @@ -6,14 +6,23 @@ import irc, api class Nexus (object) : - def __init__ (self) : - """ - Must set .irc/.api attrs to irc.Factory/api.ServerFactory instances - """ + """ + The core component, which has the set of registered ModuleInfo's from api, and the IRC connection + """ + + # runtime config options as a dict + config = None + # the irc.Factory instance + irc = None + + # the api.ServerFactory instance + api = None + + def __init__ (self, config) : + self.config = config self.modules = dict() - def registerModule (self, module, transport) : self.modules[module.name] = (module, transport) @@ -36,7 +45,7 @@ return module, connection.transport.getPeer() def makeService (config) : - n = Nexus() + n = Nexus(config) s = service.MultiService() # the IRC side @@ -52,7 +61,7 @@ irc_client.setServiceParent(s) # the API side - n.api = api.ServerFactory(n, config['api-secret']) + n.api = api.ServerFactory(n, config) log.msg("Starting API server on [%s:%d]", config['api-port'], config['api-listen']) api_server = internet.TCPServer(config['api-port'], n.api, interface=config['api-listen']) @@ -61,10 +70,4 @@ # return the service collection return s - -if __name__ == '__main__' : - log.startLogging(sys.stderr) - nexus = Nexus() - reactor.run() - diff -r eb0545ec03e7 -r 00907acd732a fixbot/setup.py --- a/fixbot/setup.py Sat Nov 06 16:01:42 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,10 +0,0 @@ -from distutils.core import setup, Extension - -_utmp = Extension('_utmp', sources=['_utmp.c']) - -setup( - name = "FixBot", - version = "0.1", - description = "IRC bot for sysadmin things", - ext_modules = [_utmp], -) diff -r eb0545ec03e7 -r 00907acd732a fixbot/tap.py --- a/fixbot/tap.py Sat Nov 06 16:01:42 2010 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ -from twisted.python import usage - -import nexus - -class NexusOptions (usage.Options) : - optParameters = [ - ( "uid", "u", "fixbot", "user to run as" ), - ( "gid", "g", "nogroup", "group to run as" ), - ( "irc-hostname", "s", "irc.fixme.fi", "IRC server hostname", ), - ( "irc-port", "p", 6667, "IRC server port", int ), - ( "irc-nickname", "n", "FixBotDev", "IRC nickname", ), - ( "irc-username", "U", "fixbot", "IRC username", ), - ( "irc-channel", "c", "#fixme-test", "IRC channel", ), - ( "api-listen" "l", "127.0.0.1", "address for API server to listen on" ), - ( "api-port", "P", 34888, "port for API server to listen on", int ), - ] - - optFlags = [ - - ] - -def makeService (config) : - # app = service.Application('fixbot', uid=config['uid'], gid=config['gid']) - return nexus.makeService(config) - diff -r eb0545ec03e7 -r 00907acd732a setup.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/setup.py Sat Nov 06 16:02:28 2010 +0200 @@ -0,0 +1,10 @@ +from distutils.core import setup, Extension + +_utmp = Extension('fixbot._utmp', sources=['fixbot/_utmp.c']) + +setup( + name = "FixBot", + version = "0.1", + description = "IRC bot for sysadmin things", + ext_modules = [_utmp], +) diff -r eb0545ec03e7 -r 00907acd732a twisted/plugins/fixbot_logwatch_plugin.py --- a/twisted/plugins/fixbot_logwatch_plugin.py Sat Nov 06 16:01:42 2010 +0200 +++ b/twisted/plugins/fixbot_logwatch_plugin.py Sat Nov 06 16:02:28 2010 +0200 @@ -6,7 +6,7 @@ from twisted.application import internet -from fixbot import api +from fixbot import api, module from fixbot import logwatch class LogwatchOptions (api.ClientOptions) : @@ -25,7 +25,7 @@ options = LogwatchOptions def makeService (self, config) : - return logwatch.makeService(config) + return logwatch.makeService(config, api.amp.ClientProtocol) serviceMaker = MyServiceMaker() diff -r eb0545ec03e7 -r 00907acd732a twisted/plugins/fixbot_nexus_plugin.py --- a/twisted/plugins/fixbot_nexus_plugin.py Sat Nov 06 16:01:42 2010 +0200 +++ b/twisted/plugins/fixbot_nexus_plugin.py Sat Nov 06 16:02:28 2010 +0200 @@ -6,9 +6,9 @@ from twisted.application import internet -from fixbot import config, nexus +from fixbot import config, nexus, api -class NexusOptions (config.ConfigOptions) : +class NexusOptions (api.ServerOptions) : optParameters = [ ( "irc-server", "s", "irc.fixme.fi", "IRC server hostname", ), @@ -16,9 +16,6 @@ ( "irc-nickname", "N", "FixBotDev", "IRC nickname", ), ( "irc-username", "U", "fixbot", "IRC username", ), ( "irc-channel", "C", "#fixme-test", "IRC channel", ), - ( "api-listen", "l", "127.0.0.1", "address for API server to listen on" ), - ( "api-port", "P", 34888, "port for API server to listen on", int ), - ( "api-secret", None, None, "secret key for API connections" ), ] optFlags = [ @@ -32,8 +29,8 @@ options = NexusOptions def makeService (self, config) : - if config['api-secret'] is None : - raise usage.UsageError("No value given for required option api-secret") + #if config['api-secret'] is None : + # raise usage.UsageError("No value given for required option api-secret") return nexus.makeService(config)