# HG changeset patch # User Tero Marttila # Date 1206031245 -7200 # Node ID 34d7897bd0f53f7517e0d592ca3e0081ded777c5 # Parent 5ab150c4a32869123f6e5697263faae24c29b17e logwatcher works committer: Tero Marttila diff -r 5ab150c4a328 -r 34d7897bd0f5 api.py --- a/api.py Thu Mar 20 18:29:42 2008 +0200 +++ b/api.py Thu Mar 20 18:40:45 2008 +0200 @@ -1,10 +1,11 @@ -from twisted.internet import protocol +from twisted.internet import protocol, reactor +from twisted.python import log from datetime import datetime import buffer -API_PORT = 34888 -SERVER_HOST = "10.0.0.5" +PORT = 34888 +SERVER_HOST = "127.0.0.1" class ModuleInfo (object) : """ @@ -53,16 +54,19 @@ def __repr__ (self) : return "%s @ %s" % (self.type, self.when) - + +CLIENT_COMMANDS = [ + "module_init", + "module_event", +] + +SERVER_COMMANDS = [ + "module_ok", +] + class ServerProtocol (buffer.StreamProtocol, protocol.Protocol) : - RECV_COMMANDS = [ - "module_init", - "module_event" - ] - - SEND_COMMANDS = [ - - ] + RECV_COMMANDS = CLIENT_COMMANDS + SEND_COMMANDS = SERVER_COMMANDS VALID_STATES = [ "wait_init", @@ -83,22 +87,34 @@ def connectionMade (self) : log.msg("Client connected") + def connectionLost (self, reason) : + log.msg("Connection lost: %s" % reason) + + if self.module : + self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage()) + def on_module_init (self, i) : self._assert(not self.module, "module_init with non-None self.module") - m = self.module = api.ModuleInfo() + m = ModuleInfo() m.name = i.readVarLen('B') m.version = i.readItem('H') - m.event_types = buffer.readStringStream(i, 'B') + m.event_types = list(buffer.readStringStream(i, 'B')) m.addr = self.transport.getPeer() - self.module_name = module_name + self.module_name = m.name - log.msg("Got mod_init for %r" % m + log.msg("Got mod_init for %r" % m) + + self.factory.nexus.registerModule(m, self) - self.factory.nexus.registerModule(m, self) + self.module = m + + o = self.startCommand('module_ok') + + self.send(o) def on_module_event (self, i) : self._assert(self.module, "module_event with None self.module!") @@ -106,9 +122,9 @@ event_type = i.readEnum(self.module.event_types) event_msg = i.readVarLen('B') - e = api.Event(self.module, event_type, event_msg) + e = Event(self.module, event_type, event_msg) - log.msg("Got mod_event of %r" % (e) + log.msg("Got mod_event of %r" % (e)) self.factory.nexus.handleEvent(e) @@ -118,4 +134,63 @@ else : return super(APIProtocol, self).logPrefix() +class ClientProtocol (buffer.StreamProtocol, protocol.Protocol) : + RECV_COMMANDS = SERVER_COMMANDS + SEND_COMMANDS = CLIENT_COMMANDS + def connectionMade (self) : + log.msg("Connected to API server, sending module init message") + + o = self.startCommand('module_init') + o.writeVarLen('B', self.factory.name) + o.writeItem("H", self.factory.version) + buffer.writeStringStream(o, 'B', self.factory.event_types) + + self.send(o) + + def sendEvent (self, event) : + o = self.startCommand('module_event') + o.writeEnum(self.factory.event_types, event.type) + o.writeVarLen('B', event.msg) + + self.send(o) + + def on_module_ok (self, i) : + log.msg("Registration OK") + + self.factory.connected(self) + + def logPrefix (self) : + return "module %s:%d client" % (self.factory.name, self.factory.version) + +class Module (ModuleInfo, protocol.ClientFactory) : + protocol = ClientProtocol + + def __init__ (self) : + log.msg("Connecting to %s:%d" % (SERVER_HOST, PORT)) + reactor.connectTCP(SERVER_HOST, PORT, self) + + self.connection = None + + def connected (self, connection) : + log.msg("Connected!") + self.connection = connection + + self.handleConnect() + + def disconnect (self) : + self.connection.transport.loseConnection() + + def sendEvent (self, type, msg) : + self.connection.sendEvent(self.buildEvent(type, msg)) + + def buildEvent (self, type, msg) : + return Event(self, type, msg) + + def handleConnect (self) : + """ + Do something + """ + + pass + diff -r 5ab150c4a328 -r 34d7897bd0f5 buffer.py --- a/buffer.py Thu Mar 20 18:29:42 2008 +0200 +++ b/buffer.py Thu Mar 20 18:40:45 2008 +0200 @@ -168,6 +168,8 @@ """ self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args)) + + writeItem = writeStruct def writeVarLen (self, len_type, data) : """ @@ -333,8 +335,13 @@ def writeStringStream (stream, varlen_type, strings) : """ + Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token """ + for item in strings : + stream.writeVarLen(varlen_type, item) + + stream.writeItem(varlen_type, 0) class StreamProtocol (object) : """ diff -r 5ab150c4a328 -r 34d7897bd0f5 irc.py --- a/irc.py Thu Mar 20 18:29:42 2008 +0200 +++ b/irc.py Thu Mar 20 18:40:45 2008 +0200 @@ -1,46 +1,61 @@ from twisted.words.protocols import irc from twisted.internet import protocol +from twisted.python import log import buffer -HOSTNAME = "irc.fixme.fi" +HOSTNAME = "irc.marttila.de" PORT = 6667 NICKNAME = "FixBot" USERNAME = "fixme" -CHANNEL = "#fixme" +CHANNEL = "#fixme-test" -class BotProtocol (irc.IRCClient) : +class BotProtocol (irc.IRCClient, object) : """ Fixme IRC bot """ + + def __init__ (self) : + self.nickname = NICKNAME + self.username = USERNAME # housekeeping def connectionMade (self) : log.msg("Connected") - super(FixBot, self).connectionMade() + super(BotProtocol, self).connectionMade() def connectionLost (self, reason) : log.msg("Connection lost: %s" % reason) - super(FixBot, self).connectionLost(reason) + super(BotProtocol, self).connectionLost(reason) def signedOn (self) : - log.msg("Signed on, joining channel %s" % channel) + log.msg("Signed on, joining channel %s" % CHANNEL) self.join(CHANNEL) def joined (self, channel) : log.msg("Joined channel %s" % channel) + + self.factory.connection = self # our actual functionality def send (self, msg) : - self.msg(CHANNEL, str(msg)) + msg = str(msg) + + if len(msg) > 480 : + log.msg("truncating: %s" % msg) + msg = msg[:480] + "..." + + msg = msg.replace("\n", "\\n").replace("\r", "\\r").replace("\0", "\\0") + + self.msg(CHANNEL, msg) def sendEvent (self, event) : - self.msg(CHANNEL, "[%s.%s] %s" % (event.module.name, event.type, event.msg)) + self.send("[%s.%s] %s" % (event.module.name, event.type, event.msg)) def moduleConnected (self, module, addr) : - self.msg(CHANNEL, "{modules} Module %s connected from %s:%d, version %s" % (module.name, addr.host, addr.port, module.version)) + self.send("{modules.%s} connected from %s:%d, version %s" % (module.name, addr.host, addr.port, module.version)) - def moduleDisconnected (self, module) : - self.msg(CHANNEL, "{modules} Module %s disconnected" % (module.name, )) + def moduleDisconnected (self, module, reason) : + self.send("{modules.%s} disconnected: %s" % (module.name, reason)) diff -r 5ab150c4a328 -r 34d7897bd0f5 logwatcher.py --- a/logwatcher.py Thu Mar 20 18:29:42 2008 +0200 +++ b/logwatcher.py Thu Mar 20 18:40:45 2008 +0200 @@ -16,16 +16,28 @@ self.module.error("tail for %s: %s" % (self.name, data)) def outReceived (self, data) : - data = buf + data + data = self.buf + data while "\n" in data : line, data = data.split("\n", 1) + + log.msg("Matching line `%s'..." % line) for filter in self.filters : out = filter.test(line) + if out : + log.msg("\t%s: %s" % (filter.event_type, out)) + self.module.sendEvent(filter.event_type, out) + self.buf = data + def processEnded (self, reason) : + msg = "tail process for %s quit: %s" % (self.name, reason.getErrorMessage()) + + log.err(msg) + self.module.error(msg) + class Filter (object) : def __init__ (self, regexp, event_type) : self.regexp = re.compile(regexp) @@ -41,11 +53,13 @@ return match.string class SudoFilter (Filter) : + REGEXP = "sudo:\s*(?P\S+) : TTY=(?P\S+) ; PWD=(?P.+?) ; USER=(?P\S+) ; COMMAND=(?P.*)" + def __init__ (self) : - super(Filter, self).__init__("sudo: (?P\w+) : TTY=(?P\w+) ; PWD=(?P\w+) ; USER=(?P\w+) ; COMMAND=(?P\w+)", "sudo") + super(SudoFilter, self).__init__(self.REGEXP, "sudo") def _filter (self, match) : - return "%(username)s:%(tty)s:%(pwd)s - `%(command)s` as %(target_user)s" + return "%(username)s:%(tty)s - %(pwd)s - `%(command)s` as %(target_user)s" % match.groupdict() class ExampleModule (api.Module) : name = "logs" @@ -70,9 +84,9 @@ self.log_objs = dict() for name, file, filters in self.log_files : - log.msg("%s - %s..." % (name, file))) + log.msg("\t%s - %s..." % (name, file)) - p = self.log_objs[name] = TailProcessProtocol(filters) + p = self.log_objs[name] = TailProcessProtocol(self, name, filters) reactor.spawnProcess(p, "/usr/bin/tail", ["tail", "--follow=name", file]) diff -r 5ab150c4a328 -r 34d7897bd0f5 nexus.py --- a/nexus.py Thu Mar 20 18:29:42 2008 +0200 +++ b/nexus.py Thu Mar 20 18:40:45 2008 +0200 @@ -1,95 +1,55 @@ -from twisted.words.protocols import irc from twisted.internet import reactor, protocol from twisted.python import log - -import buffer - -NICKNAME = "FixBot" -USERNAME = "fixme" -CHANNEL = "#fixme" -API_PORT = 34888 - -class IRCBot (irc.IRCClient) : - """ - Fixme IRC bot - """ - - # housekeeping - def connectionMade (self) : - log.msg("Connected") - super(FixBot, self).connectionMade() - - def connectionLost (self, reason) : - log.msg("Connection lost: %s" % reason) - super(FixBot, self).connectionLost(reason) - - def signedOn (self) : - log.msg("Signed on, joining channel %s" % channel) - self.join(CHANNEL) - - def joined (self, channel) : - log.msg("Joined channel %s" % channel) - - # our actual functionality - def sendEvent (self, module, event) : - self.msg(CHANNEL, "[%s] %s" % (module, event)) - - def moduleConnected (self, module, version) : - self.msg(CHANNEL, "{modules} Module %s connected, version %s" % (module, version)) - - def moduleDisconnected (self, module) : - self.msg(CHANNEL, "{modules} Module %s disconnected" % (module, )) +import sys -class APIProtocol (buffer.StreamProtocol, protocol.Protocol) : - RECV_COMMANDS = [ - "module_init", - "module_event" - ] - - SEND_COMMANDS = [ - - ] - - VALID_STATES = [ - "wait_init", - "wait_event" - ] - - # proto state - state = None - - # module info - module = None - - - def on_module_init (self, i) : - m = self.module = ModuleInfo() - - m.name = i.readVarLen('B') - m.version = i.readItem('H') - - - - addr = self.transport.getPeer() - - self.module_name = module_name - - log.msg("Got mod_init for %s:%d from %s:%d" % (module_name, module_version, addr.host, addr.port)) - - self.factory.registerModule(module_name, module_version, self) - - def on_module_event (self, i) : - event_type = i.readVarLen('B') - event_msg = i.readVarLen('B') - - log.msg("Got mod_event of type %s" % (event_type, )) - - self.factory.handleEvent(self, event_type, event_msg) +import irc, api class IRCFactory (protocol.ClientFactory) : - protocol = IRCBot + protocol = irc.BotProtocol + def __init__ (self, nexus) : + self.nexus = nexus + + log.msg("Connection to IRC at %s:%d" % (irc.HOSTNAME, irc.PORT)) + reactor.connectTCP(irc.HOSTNAME, irc.PORT, self) + + self.connection = None + + def connected (self, connection) : + self.connection = connection + +class APIFactory (protocol.ServerFactory) : + protocol = api.ServerProtocol + + def __init__ (self, nexus) : + self.nexus = nexus + + log.msg("API listening on %s:%d" % (api.SERVER_HOST, api.PORT)) + reactor.listenTCP(api.PORT, self, interface=api.SERVER_HOST) + +class Nexus (object) : def __init__ (self) : + self.modules = dict() -class APIFactory (protocol.ServerFactory - protocol = APIProtocol + self.irc = IRCFactory(self) + self.api = APIFactory(self) + + def registerModule (self, module, transport) : + self.modules[module.name] = (module, transport) + + self.irc.connection.moduleConnected(module, transport.transport.getPeer()) + + def unregisterModule (self, module, reason) : + del self.modules[module.name] + + self.irc.connection.moduleDisconnected(module, reason) + + def handleEvent (self, event) : + self.irc.connection.sendEvent(event) + +if __name__ == '__main__' : + log.startLogging(sys.stderr) + + nexus = Nexus() + reactor.run() +