--- a/etc/fixbot-logwatch.py Sat Nov 06 16:01:42 2010 +0200
+++ b/etc/fixbot-logwatch.py Sat Nov 06 16:02:28 2010 +0200
@@ -18,10 +18,10 @@
filters.su_nobody_killer,
filters.all,
)),
- UnixDatagramSocket("test", os.path.join(logwatch_dir, "test.sock"), (
- filters.sudo,
- filters.ssh,
- filters.cron_killer,
- filters.all,
- )),
+ # UnixDatagramSocket("test", os.path.join(logwatch_dir, "test.sock"), (
+ # filters.sudo,
+ # filters.ssh,
+ # filters.cron_killer,
+ # filters.all,
+ # )),
)
--- a/fixbot/api.py Sat Nov 06 16:01:42 2010 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,213 +0,0 @@
-from twisted.application import internet, service
-from twisted.internet import protocol, reactor
-from twisted.python import log, usage
-from datetime import datetime
-import sys
-
-from fixbot import buffer, config
-
-class ModuleInfo (object) :
- """
- Some info about a module
- """
-
- # module's name
- name = None
-
- def __str__ (self) :
- return "Module %s:" % (self.name)
-
- def __repr__ (self) :
- return "<module %s>" % (self.name, )
-
-class Event (object) :
- # the ModuleInfo object
- module = None
-
- # the event type as a string
- type = None
-
- # event message as a string (up to 64k, although that won't fit onto IRC..)
- msg = None
-
- # timestamp as a datetime.datetime
- when = None
-
- def __init__ (self, module, type, msg) :
- self.module = module
- self.type = type
- self.msg = msg
-
- self.when = datetime.now()
-
- def __str__ (self) :
- return "[%s] %s" % (self.type, self.msg)
-
- def __repr__ (self) :
- return "%s @ %s" % (self.type, self.when)
-
-CLIENT_COMMANDS = [
- "module_init",
- "module_event",
-]
-
-SERVER_COMMANDS = [
- "module_ok",
-]
-
-class ServerProtocol (buffer.StreamProtocol, protocol.Protocol) :
- RECV_COMMANDS = CLIENT_COMMANDS
- SEND_COMMANDS = SERVER_COMMANDS
-
- VALID_STATES = [
- "wait_init",
- "wait_event"
- ]
-
- # proto state
- state = None
-
- # module info
- module = None
-
- def _assert (self, condition, msg) :
- if not condition :
- self.transport.loseConnection()
- log.err("assert failed in APIProtocol for %s: %s" % (self.module, msg))
-
- def connectionMade (self) :
- log.msg("Client connected")
-
- def connectionLost (self, reason) :
- log.msg("Connection lost: %s" % reason)
-
- if self.module :
- self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage())
-
- def on_module_init (self, i) :
- self._assert(not self.module, "module_init with non-None self.module")
-
- peer_secret = i.readVarLen('B')
-
- self._assert(peer_secret == self.factory.secret, "Mismatching API secrets!")
-
- m = ModuleInfo()
-
- m.name = i.readVarLen('B')
- m.addr = self.transport.getPeer()
-
- log.msg("Got mod_init for %r" % m)
-
- self.factory.nexus.registerModule(m, self)
-
- self.module = m
-
- o = self.startCommand('module_ok')
-
- self.send(o)
-
- def on_module_event (self, i) :
- self._assert(self.module, "module_event with None self.module!")
-
- event_type = i.readVarLen('B')
- event_msg = i.readVarLen('H')
-
- e = Event(self.module, event_type, event_msg)
-
- self.factory.nexus.handleEvent(e)
-
- def logPrefix (self) :
- if self.module :
- return str(self.module)
- else :
- return super(ServerProtocol, self).logPrefix()
-
-class ClientProtocol (buffer.StreamProtocol, protocol.Protocol) :
- RECV_COMMANDS = SERVER_COMMANDS
- SEND_COMMANDS = CLIENT_COMMANDS
-
- def connectionMade (self) :
- log.msg("Connected to API server, sending module init message")
-
- o = self.startCommand('module_init')
- o.writeVarLen('B', self.factory.secret)
- o.writeVarLen('B', self.factory.name)
-
- self.send(o)
-
- def sendEvent (self, event) :
- o = self.startCommand('module_event')
- o.writeVarLen('B', event.type)
- o.writeVarLen('H', event.msg[:2**16])
-
- self.send(o)
-
- def on_module_ok (self, i) :
- log.msg("Registration OK")
-
- self.factory.connected(self)
-
- def logPrefix (self) :
- return "module %s client" % (self.factory.name)
-
-class Module (ModuleInfo, protocol.ClientFactory) :
- protocol = ClientProtocol
-
- def __init__ (self, config) :
- self.connection = None
- self.secret = config['api-secret']
-
- def connected (self, connection) :
- log.msg("Connected!")
- self.connection = connection
-
- self.handleConnect()
-
- def disconnect (self) :
- self.connection.transport.loseConnection()
-
- def sendEvent (self, type, msg) :
- self.connection.sendEvent(self.buildEvent(type, msg))
-
- def buildEvent (self, type, msg) :
- return Event(self, type, msg)
-
- def handleConnect (self) :
- """
- Do something
- """
-
- pass
-
-class ServerFactory (protocol.ServerFactory) :
- protocol = ServerProtocol
-
- def __init__ (self, nexus, secret) :
- self.nexus = nexus
- self.secret = secret
-
-class ClientOptions (config.ConfigOptions) :
- optParameters = [
- ( "api-server", "s", "127.0.0.1", "address of API server to connect to" ),
- ( "api-port", "P", 34888, "port of API server to connect to", int ),
- ( "api-secret", None, None, "secret key for API connections" ),
- ]
-
- optFlags = [
-
- ]
-
-def makeService (module_class, config) :
- s = service.MultiService()
-
- # build factory
- factory = module_class(config)
-
- # the API client
- log.msg("Connecting to API server on [%s:%d]" % (config['api-server'], config['api-port']))
- api_client = internet.TCPClient(config['api-server'], config['api-port'], factory)
-
- api_client.setServiceParent(s)
-
- return s
-
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fixbot/api/__init__.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,37 @@
+from twisted.internet import protocol
+
+from fixbot import config
+from fixbot.api import amp
+
+class ServerOptions (config.ConfigOptions) :
+ """
+ Options for the API component of the Nexus
+ """
+
+ optParameters = [
+ ( "api-listen", "l", "127.0.0.1", "address for API server to listen on" ),
+ ( "api-port", "P", 34888, "port for API server to listen on", int ),
+ ( "api-secret", None, None, "secret key for API connections" ),
+ ]
+
+class ClientOptions (config.ConfigOptions) :
+ """
+ Options for the API component of a Module
+ """
+
+ optParameters = [
+ ( "api-server", "s", "127.0.0.1", "address of API server to connect to" ),
+ ( "api-port", "P", 34888, "port of API server to connect to", int ),
+ ]
+
+class ServerFactory (protocol.ServerFactory) :
+ def __init__ (self, nexus, config) :
+ self.nexus = nexus
+
+ # protocol to use
+ self.protocol = amp.ServerProtocol
+
+ # XXX: only used for legacy
+ self.secret = config.get('api-secret')
+
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fixbot/api/amp.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,155 @@
+from twisted.protocols import amp
+from twisted.python import log
+
+from fixbot.module import ModuleInfo, Event
+
+class CmdModuleRegister (amp.Command) :
+ """
+ Register module
+ """
+
+ arguments = [
+ ('name', amp.String()),
+ ]
+
+class CmdModuleEvent (amp.Command) :
+ """
+ Broadcast event
+ """
+
+ arguments = [
+ ('type', amp.String()),
+ ('msg', amp.String()),
+ ]
+
+class CmdModuleAbort (amp.Command) :
+ """
+ Module has failed and will now disconnect
+ """
+
+ arguments = [
+ ('msg', amp.String()) # string describing the error occuring
+ ]
+
+ requiresAnswer = False
+
+class ServerProtocol (amp.AMP) :
+ """
+ Nexus-side command handler
+ """
+
+ # the registered ModuleInfo
+ module = None
+
+ def connectionMade (self) :
+ log.msg("Module connecting from: %s" % (self.transport.getPeer()))
+
+
+ def connectionLost (self, reason) :
+ log.err(reason, "Module lost")
+
+ if self.module :
+ # drop it
+ self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage())
+
+
+ @CmdModuleRegister.responder
+ def on_ModuleRegister (self, name) :
+ # construct the ModuleInfo
+ mi = ModuleInfo()
+ mi.name = name
+
+ log.msg("Module registered: %s" % (mi))
+
+ # register
+ self.factory.nexus.registerModule(mi, self)
+ self.module = mi
+
+ # ok
+ return {}
+
+ @CmdModuleEvent.responder
+ def on_ModuleEvent (self, type, msg) :
+ # as Event
+ e = Event(self.module, type, msg)
+
+ # publish
+ self.factory.nexus.handleEvent(e)
+
+ # ok
+ return {}
+
+ @CmdModuleAbort.responder
+ def on_ModuleAbort (self, msg) :
+ # unhook
+ module = self.module
+ self.module = None
+
+ # report
+ self.factory.nexus.unregisterModule(self.module, msg)
+
+ # XXX: stop accepting commands etc?
+
+class ClientProtocol (amp.AMP) :
+ """
+ Module-side command sender/handler
+ """
+
+
+ def connectionMade (self) :
+ """
+ Connected to nexus, send ModuleRegister
+ """
+
+ # register
+ self.sendModuleRegister(self.factory.name).addCallback(self._ModuleRegisterOK)
+
+ def connectionLost (self, reason) :
+ """
+ Disconnected from nexus, for whatever reason...
+ """
+
+ log.err(reason, "API connection lost")
+
+ # XXX: was this expected? Reconnect?
+
+ def sendModuleRegister (self, name) :
+ """
+ Register with given module name
+ """
+
+ return self.callRemote(CmdModuleRegister, name=name)
+
+
+ def _ModuleRegisterOK (self, ret) :
+ """
+ Registered with nexus, commence operation
+ """
+
+ self.factory._onRegistered(self)
+
+
+ def sendEvent (self, event) :
+ """
+ Broadcast event to nexus
+ """
+
+ self.callRemote(CmdModuleEvent, type=event.type, msg=event.msg)
+
+ def sendModuleAbort (self, msg) :
+ """
+ Send CmdModuleAbort - no response is expected
+ """
+
+ self.callRemote(CmdModuleAbort, msg=msg)
+
+
+ def abort (self, msg) :
+ """
+ Send abort message and drop connection
+ """
+
+ # disconnect. This should leave the transport to flush buffers, and then call connectionLost
+ # should also stop us from sending any more commands
+ self.transport.loseConnection()
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fixbot/api/buffer.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,417 @@
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+import struct
+
+# prefixed to all struct format strings
+STRUCT_PREFIX = '!'
+
+def hex (bytes) :
+ return ' '.join(['%#04x' % ord(b) for b in bytes])
+
+class NotEnoughDataError (Exception) :
+ pass
+
+class IStreamBase (object) :
+ # prefixed to all struct format strings
+ STRUCT_PREFIX = '!'
+
+class IReadStream (IStreamBase) :
+ """
+ IReadStream simply provides various interfaces for reading bytes from a
+ stream in various ways
+ """
+
+ def read (self, size=None) :
+ """
+ Read and return up to the given amount of bytes, or all bytes
+ available if no size given.
+ """
+
+ abstract
+
+ def readStruct (self, fmt) :
+ """
+ Reads the correct amount of data and then unpacks it according to
+ the given format. Note that this always returns a tuple, for single
+ items, use readItem
+ """
+
+ fmt = self.STRUCT_PREFIX + fmt
+
+ fmt_size = struct.calcsize(fmt)
+ data = self.read(fmt_size)
+
+ return struct.unpack(fmt, data)
+
+ def readItem (self, fmt) :
+ """
+ Reads the correct amount of data, unpacks it according to the
+ given format, and then returns the first item.
+ """
+
+ return self.readStruct(fmt)[0]
+
+ def readVarLen (self, len_type) :
+ """
+ Return the data part of a <length><data> structure.
+ len_type indicates what type length has (struct format code).
+
+ In the case of <length> being zero, returns an empty string.
+ """
+
+ size = self.readItem(len_type)
+
+ if size :
+ return self.read(size)
+ else :
+ return ""
+
+ def readEnum (self, enum) :
+ """
+ Returns the item from the given list of enum values that corresponds
+ to a single-byte value read from the stream
+ """
+
+ return enum[self.readItem('B')]
+
+class ISeekableStream (IStreamBase) :
+ """
+ Extends IStreamBase to provide the ability to seek backwards into the
+ stream (which still does not know it's length, and thence cannot seek
+ forwards).
+ """
+
+ _position = None
+
+ def tell (self) :
+ """
+ Return the current offset into the seekable stream
+ """
+
+ abstract
+
+ def seek (self, pos) :
+ """
+ Seek to the given position in the stream.
+ """
+
+ abstract
+
+ def mark (self) :
+ """
+ Set a mark that can be later rolled back to with .reset()
+ """
+
+ self._position = self.tell()
+
+ def unmark (self) :
+ """
+ Remove the mark without affecting the current position
+ """
+
+ self._position = None
+
+ def reset (self) :
+ """
+ Rolls the buffer back to the position set earlier with mark()
+ """
+
+ if self._position is not None :
+ self.seek(self._position)
+ self._position = None
+
+ else :
+ raise Exception("reset() without mark()")
+
+class ISeekableReadStream (ISeekableStream, IReadStream) :
+ def peek (self, len=None) :
+ """
+ Return a string representing what buf.read(len) would return, but do
+ not affect future read operations
+ """
+
+ pos = self.tell()
+
+ data = self.read(len)
+
+ self.seek(pos)
+
+ return data
+
+
+class INonBlockingReadStream (IReadStream) :
+ """
+ Otherwise identical to IReadStream, but read will either return size
+ bytes, or raise a NotEnoughDataError
+ """
+
+ pass
+
+class IWriteStream (IStreamBase) :
+ """
+ IWriteStream provides various ways to write data to a byte stream
+ """
+
+ def write (self, data) :
+ """
+ Write the given bytes to the stream
+ """
+
+ abstract
+
+ def writeStruct (self, fmt, *args) :
+ """
+ Pack the given arguments with the given struct format, and write it
+ to the stream.
+ """
+
+ self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args))
+
+ writeItem = writeStruct
+
+ def writeVarLen (self, len_type, data) :
+ """
+ Write a <length><data> field into the buffer. Len_type is the
+ struct format code for the length field.
+ """
+
+ self.writeStruct(len_type, len(data))
+ self.write(data)
+
+ def writeEnum (self, enum, name) :
+ """
+ Write the single-byte value correspnding to the given name's
+ position in the given enum
+ """
+
+ self.writeStruct('B', enum.index(name))
+
+class IBufferBase (ISeekableStream) :
+ """
+ A buffer simply provides a way to read and write data to/from a byte
+ sequence stored in memory.
+ """
+
+ def tell (self) :
+ return self._buf.tell()
+
+ def seek (self, offset) :
+ return self._buf.seek(offset)
+
+ def getvalue (self) :
+ """
+ Returns the value of the buffer, i.e. a string with the contents of
+ the buffer from position zero to the end.
+ """
+
+ return self._buf.getvalue()
+
+class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) :
+ """
+ A read-only buffer. Can be initialized with a given value and then later
+ replaced in various ways, but cannot be modified.
+ """
+
+ def __init__ (self, data="") :
+ """
+ Initialize the buffer with the given data
+ """
+
+ self._buf = StringIO(data)
+
+ def read (self, size=None) :
+ """
+ Return the given number of bytes, or raise a NotEnoughDataError
+ """
+
+ if size == 0 :
+ raise ValueError("can't read zero bytes")
+
+ if size :
+ ret = self._buf.read(size)
+ else :
+ ret = self._buf.read()
+
+ if size and len(ret) < size :
+ raise NotEnoughDataError()
+
+ return ret
+
+ def append (self, data) :
+ """
+ Modify the buffer such that it contains the old data from this
+ buffer, and the given data at the end. The read position in the buffer
+ is kept the same.
+ """
+
+ pos = self.tell()
+
+ self._buf = StringIO(self._buf.getvalue() + data)
+
+ self.seek(pos)
+
+ def chop (self) :
+ """
+ Discard the data in the buffer before the current read position.
+ Also removes any marks.
+ """
+
+ self._position = None
+
+ self._buf = StringIO(self.read())
+
+ def processWith (self, func) :
+ """
+ Call the given function with this buffer as an argument after
+ calling mark(). If the function
+ a) returns None, the buffer is .chop()'d, and we repeat the
+ process.
+ b) raises a NotEnoughDataError, whereupon the buffer is rolled
+ back to where it was before calling the function with
+ chop().
+ c) raises a StopIteration, whereupon we chop the buffer and
+ return.
+ d) returns something (i.e. ret is not None), whereupon we
+ return that (and leave the current buffer position untouched).
+ """
+ ret = None
+
+ try :
+ while ret is None :
+ self.mark() # mark the position of the packet we are processing
+ ret = func(self)
+
+ if ret is None :
+ # discard the processed packet and proceed to the next one
+ self.chop()
+
+ except NotEnoughDataError, e :
+ self.reset() # reset position back to the start of the packet
+ return e
+
+ except StopIteration, e:
+ self.chop()
+ return e # processed ok, but we don't want to process any further packets
+
+ else :
+ return ret
+
+class WriteBuffer (IWriteStream, IBufferBase) :
+ """
+ A write-only buffer. Data can be written to this buffer in various
+ ways, but cannot be read from it except as a whole.
+ """
+
+ def __init__ (self) :
+ """
+ Initialize the buffer
+ """
+
+ self._buf = StringIO()
+
+ def write (self, data) :
+ """
+ Write the given data to the current position in the stream,
+ overwriting any previous data in that position, and extending
+ the buffer if needed
+ """
+
+ return self._buf.write(data)
+
+def readStringStream (stream, varlen_type) :
+ """
+ Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string)
+ """
+
+ while True :
+ item = stream.readVarLen(varlen_type)
+
+ if item :
+ yield item
+ else :
+ return
+
+def writeStringStream (stream, varlen_type, strings) :
+ """
+ Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token
+ """
+
+ for item in strings :
+ stream.writeVarLen(varlen_type, item)
+
+ stream.writeItem(varlen_type, 0)
+
+class StreamProtocol (object) :
+ """
+ A mixin to let you use Buffer with twisted.internet.protocol.Protocol
+ """
+
+ # a list of receivable command names
+ RECV_COMMANDS = None
+
+ # a list of sendable command names
+ SEND_COMMANDS = None
+
+ def __init__ (self) :
+ """
+ Initialize the cross-dataReceived buffer
+ """
+
+ self.in_buffer = ReadBuffer()
+
+ def send (self, buf) :
+ """
+ Write the contents of the given WriteBuffer to the transport
+ """
+
+ self.transport.write(buf.getvalue())
+
+ def dataReceived (self, data) :
+ """
+ Buffer the incoming data and then try and process it
+ """
+
+ self.in_buffer.append(data)
+
+ ret = self.in_buffer.processWith(self.processPacket)
+
+ def processPacket (self, buf) :
+ """
+ Call processCommand with the buffer, handling the return value (either None or a deferred)
+ """
+
+ ret = self.processCommand(buf)
+
+ if ret :
+ ret.addCallback(self.send)
+
+ def processCommand (self, buf) :
+ """
+ Process a command from the given buffer. May return a callback
+ """
+
+ return self.readMethod(buf, self.RECV_COMMANDS, buf)
+
+ # conveniance read/write
+ def startCommand (self, cmd) :
+ buf = WriteBuffer()
+
+ buf.writeEnum(self.SEND_COMMANDS, cmd)
+
+ return buf
+
+ def readMethod (self, buf, methods, *args, **kwargs) :
+ """
+ Reads a single-byte <methods>-enum value from the given buffer and
+ use it to find the corresponding method (as <prefix>_<method-name>,
+ prefix can be overriden with a keyword argument and defaults to
+ 'on'. If any extra arguments are given, they will be passed to the
+ method.
+ """
+
+ prefix = kwargs.pop("prefix", "on")
+
+ return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs)
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fixbot/api/legacy.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,111 @@
+from twisted.internet import protocol
+from twisted.python import log
+
+from fixbot import buffer
+from fixbot.module import ModuleInfo, Event
+
+
+CLIENT_COMMANDS = [
+ "module_init",
+ "module_event",
+]
+
+SERVER_COMMANDS = [
+ "module_ok",
+]
+
+class ServerProtocol (buffer.StreamProtocol, protocol.Protocol) :
+ RECV_COMMANDS = CLIENT_COMMANDS
+ SEND_COMMANDS = SERVER_COMMANDS
+
+ VALID_STATES = [
+ "wait_init",
+ "wait_event"
+ ]
+
+ # proto state
+ state = None
+
+ # module info
+ module = None
+
+ def _assert (self, condition, msg) :
+ if not condition :
+ self.transport.loseConnection()
+ log.err("assert failed in APIProtocol for %s: %s" % (self.module, msg))
+
+ def connectionMade (self) :
+ log.msg("Client connected")
+
+ def connectionLost (self, reason) :
+ log.msg("Connection lost: %s" % reason)
+
+ if self.module :
+ self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage())
+
+ def on_module_init (self, i) :
+ self._assert(not self.module, "module_init with non-None self.module")
+
+ peer_secret = i.readVarLen('B')
+
+ self._assert(peer_secret == self.factory.secret, "Mismatching API secrets!")
+
+ m = ModuleInfo()
+
+ m.name = i.readVarLen('B')
+ m.addr = self.transport.getPeer()
+
+ log.msg("Got mod_init for %r" % m)
+
+ self.factory.nexus.registerModule(m, self)
+
+ self.module = m
+
+ o = self.startCommand('module_ok')
+
+ self.send(o)
+
+ def on_module_event (self, i) :
+ self._assert(self.module, "module_event with None self.module!")
+
+ event_type = i.readVarLen('B')
+ event_msg = i.readVarLen('H')
+
+ e = Event(self.module, event_type, event_msg)
+
+ self.factory.nexus.handleEvent(e)
+
+ def logPrefix (self) :
+ if self.module :
+ return str(self.module)
+ else :
+ return super(ServerProtocol, self).logPrefix()
+
+class ClientProtocol (buffer.StreamProtocol, protocol.Protocol) :
+ RECV_COMMANDS = SERVER_COMMANDS
+ SEND_COMMANDS = CLIENT_COMMANDS
+
+ def connectionMade (self) :
+ log.msg("Connected to API server, sending module init message")
+
+ o = self.startCommand('module_init')
+ o.writeVarLen('B', self.factory.secret)
+ o.writeVarLen('B', self.factory.name)
+
+ self.send(o)
+
+ def sendEvent (self, event) :
+ o = self.startCommand('module_event')
+ o.writeVarLen('B', event.type)
+ o.writeVarLen('H', event.msg[:2**16])
+
+ self.send(o)
+
+ def on_module_ok (self, i) :
+ log.msg("Registration OK")
+
+ self.factory._onRegistered(self)
+
+ def logPrefix (self) :
+ return "module %s client" % (self.factory.name)
+
--- a/fixbot/buffer.py Sat Nov 06 16:01:42 2010 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,417 +0,0 @@
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-import struct
-
-# prefixed to all struct format strings
-STRUCT_PREFIX = '!'
-
-def hex (bytes) :
- return ' '.join(['%#04x' % ord(b) for b in bytes])
-
-class NotEnoughDataError (Exception) :
- pass
-
-class IStreamBase (object) :
- # prefixed to all struct format strings
- STRUCT_PREFIX = '!'
-
-class IReadStream (IStreamBase) :
- """
- IReadStream simply provides various interfaces for reading bytes from a
- stream in various ways
- """
-
- def read (self, size=None) :
- """
- Read and return up to the given amount of bytes, or all bytes
- available if no size given.
- """
-
- abstract
-
- def readStruct (self, fmt) :
- """
- Reads the correct amount of data and then unpacks it according to
- the given format. Note that this always returns a tuple, for single
- items, use readItem
- """
-
- fmt = self.STRUCT_PREFIX + fmt
-
- fmt_size = struct.calcsize(fmt)
- data = self.read(fmt_size)
-
- return struct.unpack(fmt, data)
-
- def readItem (self, fmt) :
- """
- Reads the correct amount of data, unpacks it according to the
- given format, and then returns the first item.
- """
-
- return self.readStruct(fmt)[0]
-
- def readVarLen (self, len_type) :
- """
- Return the data part of a <length><data> structure.
- len_type indicates what type length has (struct format code).
-
- In the case of <length> being zero, returns an empty string.
- """
-
- size = self.readItem(len_type)
-
- if size :
- return self.read(size)
- else :
- return ""
-
- def readEnum (self, enum) :
- """
- Returns the item from the given list of enum values that corresponds
- to a single-byte value read from the stream
- """
-
- return enum[self.readItem('B')]
-
-class ISeekableStream (IStreamBase) :
- """
- Extends IStreamBase to provide the ability to seek backwards into the
- stream (which still does not know it's length, and thence cannot seek
- forwards).
- """
-
- _position = None
-
- def tell (self) :
- """
- Return the current offset into the seekable stream
- """
-
- abstract
-
- def seek (self, pos) :
- """
- Seek to the given position in the stream.
- """
-
- abstract
-
- def mark (self) :
- """
- Set a mark that can be later rolled back to with .reset()
- """
-
- self._position = self.tell()
-
- def unmark (self) :
- """
- Remove the mark without affecting the current position
- """
-
- self._position = None
-
- def reset (self) :
- """
- Rolls the buffer back to the position set earlier with mark()
- """
-
- if self._position is not None :
- self.seek(self._position)
- self._position = None
-
- else :
- raise Exception("reset() without mark()")
-
-class ISeekableReadStream (ISeekableStream, IReadStream) :
- def peek (self, len=None) :
- """
- Return a string representing what buf.read(len) would return, but do
- not affect future read operations
- """
-
- pos = self.tell()
-
- data = self.read(len)
-
- self.seek(pos)
-
- return data
-
-
-class INonBlockingReadStream (IReadStream) :
- """
- Otherwise identical to IReadStream, but read will either return size
- bytes, or raise a NotEnoughDataError
- """
-
- pass
-
-class IWriteStream (IStreamBase) :
- """
- IWriteStream provides various ways to write data to a byte stream
- """
-
- def write (self, data) :
- """
- Write the given bytes to the stream
- """
-
- abstract
-
- def writeStruct (self, fmt, *args) :
- """
- Pack the given arguments with the given struct format, and write it
- to the stream.
- """
-
- self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args))
-
- writeItem = writeStruct
-
- def writeVarLen (self, len_type, data) :
- """
- Write a <length><data> field into the buffer. Len_type is the
- struct format code for the length field.
- """
-
- self.writeStruct(len_type, len(data))
- self.write(data)
-
- def writeEnum (self, enum, name) :
- """
- Write the single-byte value correspnding to the given name's
- position in the given enum
- """
-
- self.writeStruct('B', enum.index(name))
-
-class IBufferBase (ISeekableStream) :
- """
- A buffer simply provides a way to read and write data to/from a byte
- sequence stored in memory.
- """
-
- def tell (self) :
- return self._buf.tell()
-
- def seek (self, offset) :
- return self._buf.seek(offset)
-
- def getvalue (self) :
- """
- Returns the value of the buffer, i.e. a string with the contents of
- the buffer from position zero to the end.
- """
-
- return self._buf.getvalue()
-
-class ReadBuffer (INonBlockingReadStream, ISeekableReadStream, IBufferBase) :
- """
- A read-only buffer. Can be initialized with a given value and then later
- replaced in various ways, but cannot be modified.
- """
-
- def __init__ (self, data="") :
- """
- Initialize the buffer with the given data
- """
-
- self._buf = StringIO(data)
-
- def read (self, size=None) :
- """
- Return the given number of bytes, or raise a NotEnoughDataError
- """
-
- if size == 0 :
- raise ValueError("can't read zero bytes")
-
- if size :
- ret = self._buf.read(size)
- else :
- ret = self._buf.read()
-
- if size and len(ret) < size :
- raise NotEnoughDataError()
-
- return ret
-
- def append (self, data) :
- """
- Modify the buffer such that it contains the old data from this
- buffer, and the given data at the end. The read position in the buffer
- is kept the same.
- """
-
- pos = self.tell()
-
- self._buf = StringIO(self._buf.getvalue() + data)
-
- self.seek(pos)
-
- def chop (self) :
- """
- Discard the data in the buffer before the current read position.
- Also removes any marks.
- """
-
- self._position = None
-
- self._buf = StringIO(self.read())
-
- def processWith (self, func) :
- """
- Call the given function with this buffer as an argument after
- calling mark(). If the function
- a) returns None, the buffer is .chop()'d, and we repeat the
- process.
- b) raises a NotEnoughDataError, whereupon the buffer is rolled
- back to where it was before calling the function with
- chop().
- c) raises a StopIteration, whereupon we chop the buffer and
- return.
- d) returns something (i.e. ret is not None), whereupon we
- return that (and leave the current buffer position untouched).
- """
- ret = None
-
- try :
- while ret is None :
- self.mark() # mark the position of the packet we are processing
- ret = func(self)
-
- if ret is None :
- # discard the processed packet and proceed to the next one
- self.chop()
-
- except NotEnoughDataError, e :
- self.reset() # reset position back to the start of the packet
- return e
-
- except StopIteration, e:
- self.chop()
- return e # processed ok, but we don't want to process any further packets
-
- else :
- return ret
-
-class WriteBuffer (IWriteStream, IBufferBase) :
- """
- A write-only buffer. Data can be written to this buffer in various
- ways, but cannot be read from it except as a whole.
- """
-
- def __init__ (self) :
- """
- Initialize the buffer
- """
-
- self._buf = StringIO()
-
- def write (self, data) :
- """
- Write the given data to the current position in the stream,
- overwriting any previous data in that position, and extending
- the buffer if needed
- """
-
- return self._buf.write(data)
-
-def readStringStream (stream, varlen_type) :
- """
- Does readVarLen on an IReadStream until it returns something that evaluates to false ( == zero-length string)
- """
-
- while True :
- item = stream.readVarLen(varlen_type)
-
- if item :
- yield item
- else :
- return
-
-def writeStringStream (stream, varlen_type, strings) :
- """
- Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token
- """
-
- for item in strings :
- stream.writeVarLen(varlen_type, item)
-
- stream.writeItem(varlen_type, 0)
-
-class StreamProtocol (object) :
- """
- A mixin to let you use Buffer with twisted.internet.protocol.Protocol
- """
-
- # a list of receivable command names
- RECV_COMMANDS = None
-
- # a list of sendable command names
- SEND_COMMANDS = None
-
- def __init__ (self) :
- """
- Initialize the cross-dataReceived buffer
- """
-
- self.in_buffer = ReadBuffer()
-
- def send (self, buf) :
- """
- Write the contents of the given WriteBuffer to the transport
- """
-
- self.transport.write(buf.getvalue())
-
- def dataReceived (self, data) :
- """
- Buffer the incoming data and then try and process it
- """
-
- self.in_buffer.append(data)
-
- ret = self.in_buffer.processWith(self.processPacket)
-
- def processPacket (self, buf) :
- """
- Call processCommand with the buffer, handling the return value (either None or a deferred)
- """
-
- ret = self.processCommand(buf)
-
- if ret :
- ret.addCallback(self.send)
-
- def processCommand (self, buf) :
- """
- Process a command from the given buffer. May return a callback
- """
-
- return self.readMethod(buf, self.RECV_COMMANDS, buf)
-
- # conveniance read/write
- def startCommand (self, cmd) :
- buf = WriteBuffer()
-
- buf.writeEnum(self.SEND_COMMANDS, cmd)
-
- return buf
-
- def readMethod (self, buf, methods, *args, **kwargs) :
- """
- Reads a single-byte <methods>-enum value from the given buffer and
- use it to find the corresponding method (as <prefix>_<method-name>,
- prefix can be overriden with a keyword argument and defaults to
- 'on'. If any extra arguments are given, they will be passed to the
- method.
- """
-
- prefix = kwargs.pop("prefix", "on")
-
- return getattr(self, "%s_%s" % (prefix, buf.readEnum(methods)))(*args, **kwargs)
-
--- a/fixbot/fifo.py Sat Nov 06 16:01:42 2010 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,90 +0,0 @@
-# read a stream from a fifo
-
-from twisted.internet import reactor, interfaces
-from twisted.python import log
-from zope.interface import implements
-
-import os, fcntl, errno
-
-class EOF (Exception) : pass
-
-BUF_SIZE = 2048
-
-class Fifo (object) :
- implements(interfaces.IReadDescriptor)
-
- def __init__ (self, path) :
- self.path = path
- self.fd = None
-
- self._open()
-
- def _open (self) :
- self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
-
- reactor.addReader(self)
-
- def close (self) :
- if self.fd :
- reactor.removeReader(self)
- os.close(self.fd)
-
- self.fd = None
-
- def reopen (self) :
- """
- Close and re-open the fifo. This is useful for handling EOF
- """
- self.close()
- self._open()
-
- def _read (self, length) :
-
- try :
- data = os.read(self.fd, length)
-
- except OSError, e :
- if e.errno == errno.EAGAIN :
- return None
- else :
- raise
-
- if not data :
- raise EOF()
-
- return data
-
- def fileno (self) :
- return self.fd
-
- def doRead (self) :
- while True :
- try :
- data = self._read(BUF_SIZE)
- except EOF :
- self.handleEOF()
- return
-
- if data :
- self.dataReceived(data)
- else :
- break
-
- def dataReceived (self, data) :
- pass
-
- def handleEOF (self) :
- pass
-
- def connectionLost (self, reason) :
- self.close()
-
- def logPrefix (self) :
- return "fifo(%s)" % (self.path, )
-
- def __del__ (self) :
- """
- !!! this is important
- """
- self.close()
-
--- a/fixbot/irc.py Sat Nov 06 16:01:42 2010 +0200
+++ b/fixbot/irc.py Sat Nov 06 16:02:28 2010 +0200
@@ -3,8 +3,6 @@
from twisted.python import log
import traceback
-import buffer
-
class ReplyException (Exception) :
def __init__ (self, reply) :
self.reply = reply
--- a/fixbot/logwatch/__init__.py Sat Nov 06 16:01:42 2010 +0200
+++ b/fixbot/logwatch/__init__.py Sat Nov 06 16:02:28 2010 +0200
@@ -3,17 +3,17 @@
reformatting them before sending out to IRC.
"""
-from fixbot import api
+from fixbot import module
-class LogWatchModule (api.Module) :
+class LogWatchModule (module.Module) :
name = "logs"
- def __init__ (self, config) :
+ def __init__ (self, config, protocol) :
"""
Initialize with logwatch_sources from config
"""
-
- super(LogWatchModule, self).__init__(config)
+
+ super(LogWatchModule, self).__init__(config, protocol)
self.sources = config['logwatch-sources']
@@ -28,6 +28,6 @@
def error (self, msg) :
self.sendEvent("error", msg)
-def makeService (config) :
- return api.makeService(LogWatchModule, config)
+def makeService (config, protocol) :
+ return module.makeService(LogWatchModule, config, protocol)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fixbot/logwatch/fifo.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,90 @@
+# read a stream from a fifo
+
+from twisted.internet import reactor, interfaces
+from twisted.python import log
+from zope.interface import implements
+
+import os, fcntl, errno
+
+class EOF (Exception) : pass
+
+BUF_SIZE = 2048
+
+class Fifo (object) :
+ implements(interfaces.IReadDescriptor)
+
+ def __init__ (self, path) :
+ self.path = path
+ self.fd = None
+
+ self._open()
+
+ def _open (self) :
+ self.fd = os.open(self.path, os.O_RDONLY | os.O_NONBLOCK)
+
+ reactor.addReader(self)
+
+ def close (self) :
+ if self.fd :
+ reactor.removeReader(self)
+ os.close(self.fd)
+
+ self.fd = None
+
+ def reopen (self) :
+ """
+ Close and re-open the fifo. This is useful for handling EOF
+ """
+ self.close()
+ self._open()
+
+ def _read (self, length) :
+
+ try :
+ data = os.read(self.fd, length)
+
+ except OSError, e :
+ if e.errno == errno.EAGAIN :
+ return None
+ else :
+ raise
+
+ if not data :
+ raise EOF()
+
+ return data
+
+ def fileno (self) :
+ return self.fd
+
+ def doRead (self) :
+ while True :
+ try :
+ data = self._read(BUF_SIZE)
+ except EOF :
+ self.handleEOF()
+ return
+
+ if data :
+ self.dataReceived(data)
+ else :
+ break
+
+ def dataReceived (self, data) :
+ pass
+
+ def handleEOF (self) :
+ pass
+
+ def connectionLost (self, reason) :
+ self.close()
+
+ def logPrefix (self) :
+ return "fifo(%s)" % (self.path, )
+
+ def __del__ (self) :
+ """
+ !!! this is important
+ """
+ self.close()
+
--- a/fixbot/logwatch/sources.py Sat Nov 06 16:01:42 2010 +0200
+++ b/fixbot/logwatch/sources.py Sat Nov 06 16:02:28 2010 +0200
@@ -5,8 +5,7 @@
from twisted.internet import protocol, reactor
from twisted.python import log
-from fixbot import fifo
-from fixbot.logwatch import message
+from fixbot.logwatch import fifo, message
class LogSource (object) :
"""
@@ -38,6 +37,13 @@
log.err(msg)
self.module.error(msg)
+ def connectionLost (self, reason) :
+ """
+ The transport we were connected to has dropped, possibly as a result of our handlers raising an error?
+ """
+
+ self.handleError("lost LogSource for %s: %s" % (self.name, reason.getErrorMessage()))
+
def handleData (self, data) :
"""
Feed binary data into the buffer, processing all lines via handleLine()
@@ -110,6 +116,9 @@
else :
raise ValueError(out)
+ def logPrefix (self) :
+ return "LogSource(%s)" % (self.name, )
+
class File (LogSource, protocol.ProcessProtocol) :
"""
Stream lines from a regular file using /usr/bin/tail -f
@@ -150,11 +159,7 @@
def handleEOF (self) :
self.handleError("!!! EOF on fifo %s, re-opening" % self.name)
- self.reopen()
-
- def connectionLost (self, reason) :
- super(Fifo, self).connectionLost(reason)
- self.handleError("lost fifo for %s: %s" % (self.name, reason.getErrorMessage()))
+ self.reopen()
class UnixDatagramSocket (LogSource, protocol.DatagramProtocol) :
"""
@@ -181,6 +186,4 @@
# handle it as a line of data
self.handleLine(data)
- def logPrefix (self) :
- return "LogSource(%s)" % (self.name, )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/fixbot/module.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,131 @@
+from twisted.internet import protocol
+from twisted.application import service, internet
+from twisted.python import log
+
+import datetime
+
+class ModuleInfo (object) :
+ """
+ Nexus-side handle on a Module
+ """
+
+ # module's name
+ name = None
+
+ def __str__ (self) :
+ return "%s" % (self.name)
+
+ def __repr__ (self) :
+ return "<module %s>" % (self.name, )
+
+
+class Event (object) :
+ """
+ An Event, sent by a Module to the Nexus, to be distributed further
+ """
+
+ # the ModuleInfo object
+ module = None
+
+ # the event type as a string
+ type = None
+
+ # event message as a string (up to 64k, although that won't fit onto IRC..)
+ msg = None
+
+ # timestamp as a datetime.datetime
+ when = None
+
+ def __init__ (self, module, type, msg) :
+ self.module = module
+ self.type = type
+ self.msg = msg
+
+ self.when = datetime.datetime.now()
+
+ def __str__ (self) :
+ return "[%s] %s" % (self.type, self.msg)
+
+ def __repr__ (self) :
+ return "%s @ %s" % (self.type, self.when)
+
+
+class Module (ModuleInfo, protocol.ClientFactory) :
+ """
+ Module core, handles the API connection state and processes all messages
+ """
+
+ # our API connection to the Nexus
+ connection = None
+
+ def __init__ (self, config, protocol) :
+ """
+ config - configuration for connecting to nexus
+ protocol - API client protocol to use for this factory
+ """
+
+ self.protocol = protocol
+
+ self.connection = None
+
+ # XXX: legacy: self.secret = config['api-secret']
+
+
+ def _onRegistered (self, connection) :
+ """
+ Connected to nexus and registered
+ """
+
+ log.msg("Connected and registered")
+
+ self.connection = connection
+
+ # XXX: abort on errors?
+ self.handleConnect()
+
+
+ # XXX: unused, bad interface
+ def disconnect (self) :
+ """
+ Disconnect from Nexus
+ """
+
+ self.connection.transport.loseConnection()
+
+
+ def sendEvent (self, type, msg) :
+ """
+ Send event to nexus
+ """
+
+ self.connection.sendEvent(Event(self, type, msg))
+
+
+ def handleConnect (self) :
+ """
+ Do something once we are connected to nexus and registered
+ """
+
+ pass
+
+ def abort (self, err) :
+ """
+ Abort this module, disconnecting with the given error
+ """
+
+ self.connection.abort(str(err))
+
+def makeService (module_class, config, protocol) :
+ s = service.MultiService()
+
+ # build factory
+ factory = module_class(config, protocol)
+
+ # the API client
+ log.msg("Connecting to API server on [%s:%d]" % (config['api-server'], config['api-port']))
+ api_client = internet.TCPClient(config['api-server'], config['api-port'], factory)
+
+ api_client.setServiceParent(s)
+
+ return s
+
--- a/fixbot/nexus.py Sat Nov 06 16:01:42 2010 +0200
+++ b/fixbot/nexus.py Sat Nov 06 16:02:28 2010 +0200
@@ -6,14 +6,23 @@
import irc, api
class Nexus (object) :
- def __init__ (self) :
- """
- Must set .irc/.api attrs to irc.Factory/api.ServerFactory instances
- """
+ """
+ The core component, which has the set of registered ModuleInfo's from api, and the IRC connection
+ """
+
+ # runtime config options as a dict
+ config = None
+ # the irc.Factory instance
+ irc = None
+
+ # the api.ServerFactory instance
+ api = None
+
+ def __init__ (self, config) :
+ self.config = config
self.modules = dict()
-
def registerModule (self, module, transport) :
self.modules[module.name] = (module, transport)
@@ -36,7 +45,7 @@
return module, connection.transport.getPeer()
def makeService (config) :
- n = Nexus()
+ n = Nexus(config)
s = service.MultiService()
# the IRC side
@@ -52,7 +61,7 @@
irc_client.setServiceParent(s)
# the API side
- n.api = api.ServerFactory(n, config['api-secret'])
+ n.api = api.ServerFactory(n, config)
log.msg("Starting API server on [%s:%d]", config['api-port'], config['api-listen'])
api_server = internet.TCPServer(config['api-port'], n.api, interface=config['api-listen'])
@@ -61,10 +70,4 @@
# return the service collection
return s
-
-if __name__ == '__main__' :
- log.startLogging(sys.stderr)
- nexus = Nexus()
- reactor.run()
-
--- a/fixbot/setup.py Sat Nov 06 16:01:42 2010 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,10 +0,0 @@
-from distutils.core import setup, Extension
-
-_utmp = Extension('_utmp', sources=['_utmp.c'])
-
-setup(
- name = "FixBot",
- version = "0.1",
- description = "IRC bot for sysadmin things",
- ext_modules = [_utmp],
-)
--- a/fixbot/tap.py Sat Nov 06 16:01:42 2010 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,25 +0,0 @@
-from twisted.python import usage
-
-import nexus
-
-class NexusOptions (usage.Options) :
- optParameters = [
- ( "uid", "u", "fixbot", "user to run as" ),
- ( "gid", "g", "nogroup", "group to run as" ),
- ( "irc-hostname", "s", "irc.fixme.fi", "IRC server hostname", ),
- ( "irc-port", "p", 6667, "IRC server port", int ),
- ( "irc-nickname", "n", "FixBotDev", "IRC nickname", ),
- ( "irc-username", "U", "fixbot", "IRC username", ),
- ( "irc-channel", "c", "#fixme-test", "IRC channel", ),
- ( "api-listen" "l", "127.0.0.1", "address for API server to listen on" ),
- ( "api-port", "P", 34888, "port for API server to listen on", int ),
- ]
-
- optFlags = [
-
- ]
-
-def makeService (config) :
- # app = service.Application('fixbot', uid=config['uid'], gid=config['gid'])
- return nexus.makeService(config)
-
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/setup.py Sat Nov 06 16:02:28 2010 +0200
@@ -0,0 +1,10 @@
+from distutils.core import setup, Extension
+
+_utmp = Extension('fixbot._utmp', sources=['fixbot/_utmp.c'])
+
+setup(
+ name = "FixBot",
+ version = "0.1",
+ description = "IRC bot for sysadmin things",
+ ext_modules = [_utmp],
+)
--- a/twisted/plugins/fixbot_logwatch_plugin.py Sat Nov 06 16:01:42 2010 +0200
+++ b/twisted/plugins/fixbot_logwatch_plugin.py Sat Nov 06 16:02:28 2010 +0200
@@ -6,7 +6,7 @@
from twisted.application import internet
-from fixbot import api
+from fixbot import api, module
from fixbot import logwatch
class LogwatchOptions (api.ClientOptions) :
@@ -25,7 +25,7 @@
options = LogwatchOptions
def makeService (self, config) :
- return logwatch.makeService(config)
+ return logwatch.makeService(config, api.amp.ClientProtocol)
serviceMaker = MyServiceMaker()
--- a/twisted/plugins/fixbot_nexus_plugin.py Sat Nov 06 16:01:42 2010 +0200
+++ b/twisted/plugins/fixbot_nexus_plugin.py Sat Nov 06 16:02:28 2010 +0200
@@ -6,9 +6,9 @@
from twisted.application import internet
-from fixbot import config, nexus
+from fixbot import config, nexus, api
-class NexusOptions (config.ConfigOptions) :
+class NexusOptions (api.ServerOptions) :
optParameters = [
( "irc-server", "s", "irc.fixme.fi", "IRC server hostname", ),
@@ -16,9 +16,6 @@
( "irc-nickname", "N", "FixBotDev", "IRC nickname", ),
( "irc-username", "U", "fixbot", "IRC username", ),
( "irc-channel", "C", "#fixme-test", "IRC channel", ),
- ( "api-listen", "l", "127.0.0.1", "address for API server to listen on" ),
- ( "api-port", "P", 34888, "port for API server to listen on", int ),
- ( "api-secret", None, None, "secret key for API connections" ),
]
optFlags = [
@@ -32,8 +29,8 @@
options = NexusOptions
def makeService (self, config) :
- if config['api-secret'] is None :
- raise usage.UsageError("No value given for required option api-secret")
+ #if config['api-secret'] is None :
+ # raise usage.UsageError("No value given for required option api-secret")
return nexus.makeService(config)