logwatcher works
authorTero Marttila <terom@paivola.fi>
Thu, 20 Mar 2008 18:40:45 +0200
changeset 4 34d7897bd0f5
parent 3 5ab150c4a328
child 5 8e7493df9f52
logwatcher works

committer: Tero Marttila <terom@paivola.fi>
api.py
buffer.py
irc.py
logwatcher.py
nexus.py
--- a/api.py	Thu Mar 20 18:29:42 2008 +0200
+++ b/api.py	Thu Mar 20 18:40:45 2008 +0200
@@ -1,10 +1,11 @@
-from twisted.internet import protocol
+from twisted.internet import protocol, reactor
+from twisted.python import log
 from datetime import datetime
 
 import buffer
 
-API_PORT        = 34888
-SERVER_HOST     = "10.0.0.5"
+PORT            = 34888
+SERVER_HOST     = "127.0.0.1"
 
 class ModuleInfo (object) :
     """
@@ -53,16 +54,19 @@
     
     def __repr__ (self) :
         return "%s @ %s" % (self.type, self.when)
-        
+ 
+CLIENT_COMMANDS = [
+    "module_init",
+    "module_event",
+]
+
+SERVER_COMMANDS = [
+    "module_ok",
+]
+       
 class ServerProtocol (buffer.StreamProtocol, protocol.Protocol) :
-    RECV_COMMANDS = [
-        "module_init",
-        "module_event"
-    ]
-
-    SEND_COMMANDS = [
-
-    ]
+    RECV_COMMANDS = CLIENT_COMMANDS
+    SEND_COMMANDS = SERVER_COMMANDS    
 
     VALID_STATES = [
         "wait_init",
@@ -83,22 +87,34 @@
     def connectionMade (self) :
         log.msg("Client connected")
 
+    def connectionLost (self, reason) :
+        log.msg("Connection lost: %s" % reason)
+        
+        if self.module :
+            self.factory.nexus.unregisterModule(self.module, reason.getErrorMessage())
+
     def on_module_init (self, i) :
         self._assert(not self.module, "module_init with non-None self.module")
 
-        m = self.module = api.ModuleInfo()
+        m = ModuleInfo()
         
         m.name = i.readVarLen('B')
         m.version = i.readItem('H')
 
-        m.event_types = buffer.readStringStream(i, 'B')
+        m.event_types = list(buffer.readStringStream(i, 'B'))
         m.addr = self.transport.getPeer()
 
-        self.module_name = module_name
+        self.module_name = m.name
 
-        log.msg("Got mod_init for %r" % m
+        log.msg("Got mod_init for %r" % m)
+        
+        self.factory.nexus.registerModule(m, self)
 
-        self.factory.nexus.registerModule(m, self)
+        self.module = m
+
+        o = self.startCommand('module_ok')
+
+        self.send(o)
 
     def on_module_event (self, i) :
         self._assert(self.module, "module_event with None self.module!")
@@ -106,9 +122,9 @@
         event_type = i.readEnum(self.module.event_types)
         event_msg = i.readVarLen('B')
         
-        e = api.Event(self.module, event_type, event_msg)
+        e = Event(self.module, event_type, event_msg)
 
-        log.msg("Got mod_event of %r" % (e)
+        log.msg("Got mod_event of %r" % (e))
 
         self.factory.nexus.handleEvent(e)
 
@@ -118,4 +134,63 @@
         else :
             return super(APIProtocol, self).logPrefix()
 
+class ClientProtocol (buffer.StreamProtocol, protocol.Protocol) :
+    RECV_COMMANDS = SERVER_COMMANDS
+    SEND_COMMANDS = CLIENT_COMMANDS
 
+    def connectionMade (self) :
+        log.msg("Connected to API server, sending module init message")
+
+        o = self.startCommand('module_init')
+        o.writeVarLen('B', self.factory.name)
+        o.writeItem("H", self.factory.version)
+        buffer.writeStringStream(o, 'B', self.factory.event_types)
+
+        self.send(o)
+
+    def sendEvent (self, event) :
+        o = self.startCommand('module_event')
+        o.writeEnum(self.factory.event_types, event.type)
+        o.writeVarLen('B', event.msg)
+
+        self.send(o)
+
+    def on_module_ok (self, i) :
+        log.msg("Registration OK")
+
+        self.factory.connected(self)
+    
+    def logPrefix (self) :
+        return "module %s:%d client" % (self.factory.name, self.factory.version)
+
+class Module (ModuleInfo, protocol.ClientFactory) :
+    protocol = ClientProtocol
+
+    def __init__ (self) :
+        log.msg("Connecting to %s:%d" % (SERVER_HOST, PORT))
+        reactor.connectTCP(SERVER_HOST, PORT, self)
+
+        self.connection = None
+        
+    def connected (self, connection) :
+        log.msg("Connected!")
+        self.connection = connection
+
+        self.handleConnect()
+
+    def disconnect (self) :
+        self.connection.transport.loseConnection()
+    
+    def sendEvent (self, type, msg) :
+        self.connection.sendEvent(self.buildEvent(type, msg))
+
+    def buildEvent (self, type, msg) :
+        return Event(self, type, msg)
+
+    def handleConnect (self) :
+        """
+            Do something
+        """
+
+        pass
+
--- a/buffer.py	Thu Mar 20 18:29:42 2008 +0200
+++ b/buffer.py	Thu Mar 20 18:40:45 2008 +0200
@@ -168,6 +168,8 @@
         """
 
         self.write(struct.pack(self.STRUCT_PREFIX + fmt, *args))
+
+    writeItem = writeStruct
         
     def writeVarLen (self, len_type, data) :
         """
@@ -333,8 +335,13 @@
 
 def writeStringStream (stream, varlen_type, strings) :
     """
+        Writes strings from the given iterable into the given stream using the given varlen_type, ending with a null-length token
     """
 
+    for item in strings :
+        stream.writeVarLen(varlen_type, item)
+
+    stream.writeItem(varlen_type, 0)
 
 class StreamProtocol (object) :
     """
--- a/irc.py	Thu Mar 20 18:29:42 2008 +0200
+++ b/irc.py	Thu Mar 20 18:40:45 2008 +0200
@@ -1,46 +1,61 @@
 from twisted.words.protocols import irc
 from twisted.internet import protocol
+from twisted.python import log
 
 import buffer
 
-HOSTNAME        = "irc.fixme.fi"
+HOSTNAME        = "irc.marttila.de"
 PORT            = 6667
 NICKNAME        = "FixBot"
 USERNAME        = "fixme"
-CHANNEL         = "#fixme"
+CHANNEL         = "#fixme-test"
 
-class BotProtocol (irc.IRCClient) :
+class BotProtocol (irc.IRCClient, object) :
     """
         Fixme IRC bot
     """
+    
+    def __init__ (self) :
+        self.nickname = NICKNAME
+        self.username = USERNAME
 
     # housekeeping
     def connectionMade (self) :
         log.msg("Connected")
-        super(FixBot, self).connectionMade()
+        super(BotProtocol, self).connectionMade()
 
     def connectionLost (self, reason) :
         log.msg("Connection lost: %s" % reason)
-        super(FixBot, self).connectionLost(reason)
+        super(BotProtocol, self).connectionLost(reason)
 
     def signedOn (self) :
-        log.msg("Signed on, joining channel %s" % channel)
+        log.msg("Signed on, joining channel %s" % CHANNEL)
         self.join(CHANNEL)
 
     def joined (self, channel) :
         log.msg("Joined channel %s" % channel)
+
+        self.factory.connection = self
     
     # our actual functionality
     def send (self, msg) :
-        self.msg(CHANNEL, str(msg))
+        msg = str(msg)
+
+        if len(msg) > 480 :
+            log.msg("truncating: %s" % msg)
+            msg = msg[:480] + "..."
+
+        msg = msg.replace("\n", "\\n").replace("\r", "\\r").replace("\0", "\\0")
+
+        self.msg(CHANNEL, msg)
 
     def sendEvent (self, event) :
-        self.msg(CHANNEL, "[%s.%s] %s" % (event.module.name, event.type, event.msg))
+        self.send("[%s.%s] %s" % (event.module.name, event.type, event.msg))
 
     def moduleConnected (self, module, addr) :
-        self.msg(CHANNEL, "{modules} Module %s connected from %s:%d, version %s" % (module.name, addr.host, addr.port, module.version))
+        self.send("{modules.%s} connected from %s:%d, version %s" % (module.name, addr.host, addr.port, module.version))
 
-    def moduleDisconnected (self, module) :
-        self.msg(CHANNEL, "{modules} Module %s disconnected" % (module.name, ))
+    def moduleDisconnected (self, module, reason) :
+        self.send("{modules.%s} disconnected: %s" % (module.name, reason))
 
 
--- a/logwatcher.py	Thu Mar 20 18:29:42 2008 +0200
+++ b/logwatcher.py	Thu Mar 20 18:40:45 2008 +0200
@@ -16,16 +16,28 @@
         self.module.error("tail for %s: %s" % (self.name, data))
 
     def outReceived (self, data) :
-        data = buf + data
+        data = self.buf + data
         
         while "\n" in data :
             line, data = data.split("\n", 1)
+            
+            log.msg("Matching line `%s'..." % line)
 
             for filter in self.filters :
                 out = filter.test(line)
 
+                if out :
+                    log.msg("\t%s: %s" % (filter.event_type, out))
+                    self.module.sendEvent(filter.event_type, out)
+
         self.buf = data
 
+    def processEnded (self, reason) :
+        msg = "tail process for %s quit: %s" % (self.name, reason.getErrorMessage())
+
+        log.err(msg)
+        self.module.error(msg)
+
 class Filter (object) :
     def __init__ (self, regexp, event_type) :
         self.regexp = re.compile(regexp)
@@ -41,11 +53,13 @@
         return match.string
 
 class SudoFilter (Filter) :
+    REGEXP = "sudo:\s*(?P<username>\S+) : TTY=(?P<tty>\S+) ; PWD=(?P<pwd>.+?) ; USER=(?P<target_user>\S+) ; COMMAND=(?P<command>.*)"
+
     def __init__ (self) :
-        super(Filter, self).__init__("sudo: (?P<username>\w+) : TTY=(?P<tty>\w+) ; PWD=(?P<pwd>\w+) ; USER=(?P<target_user>\w+) ; COMMAND=(?P<command>\w+)", "sudo")
+        super(SudoFilter, self).__init__(self.REGEXP, "sudo")
 
     def _filter (self, match) :
-        return "%(username)s:%(tty)s:%(pwd)s - `%(command)s` as %(target_user)s"
+        return "%(username)s:%(tty)s - %(pwd)s - `%(command)s` as %(target_user)s" % match.groupdict()
 
 class ExampleModule (api.Module) :
     name = "logs"
@@ -70,9 +84,9 @@
         self.log_objs = dict()
 
         for name, file, filters in self.log_files :
-            log.msg("%s - %s..." % (name, file)))
+            log.msg("\t%s - %s..." % (name, file))
 
-            p = self.log_objs[name] = TailProcessProtocol(filters)
+            p = self.log_objs[name] = TailProcessProtocol(self, name, filters)
 
             reactor.spawnProcess(p, "/usr/bin/tail", ["tail", "--follow=name", file])
     
--- a/nexus.py	Thu Mar 20 18:29:42 2008 +0200
+++ b/nexus.py	Thu Mar 20 18:40:45 2008 +0200
@@ -1,95 +1,55 @@
-from twisted.words.protocols import irc
 from twisted.internet import reactor, protocol
 from twisted.python import log
-
-import buffer
-
-NICKNAME        = "FixBot"
-USERNAME        = "fixme"
-CHANNEL         = "#fixme"
-API_PORT        = 34888
-
-class IRCBot (irc.IRCClient) :
-    """
-        Fixme IRC bot
-    """
-    
-    # housekeeping
-    def connectionMade (self) :
-        log.msg("Connected")
-        super(FixBot, self).connectionMade()
-
-    def connectionLost (self, reason) :
-        log.msg("Connection lost: %s" % reason)
-        super(FixBot, self).connectionLost(reason)
-
-    def signedOn (self) :
-        log.msg("Signed on, joining channel %s" % channel)
-        self.join(CHANNEL)
-
-    def joined (self, channel) :
-        log.msg("Joined channel %s" % channel)
-    
-    # our actual functionality
-    def sendEvent (self, module, event) :
-        self.msg(CHANNEL, "[%s] %s" % (module, event))
-
-    def moduleConnected (self, module, version) :
-        self.msg(CHANNEL, "{modules} Module %s connected, version %s" % (module, version))
-
-    def moduleDisconnected (self, module) :
-        self.msg(CHANNEL, "{modules} Module %s disconnected" % (module, ))
+import sys
 
-class APIProtocol (buffer.StreamProtocol, protocol.Protocol) :
-    RECV_COMMANDS = [
-        "module_init",
-        "module_event"
-    ]
-
-    SEND_COMMANDS = [
-
-    ]
-
-    VALID_STATES = [
-        "wait_init",
-        "wait_event"
-    ]
-    
-    # proto state
-    state = None
-
-    # module info
-    module = None
-
-
-    def on_module_init (self, i) :
-        m = self.module = ModuleInfo()
-
-        m.name = i.readVarLen('B')
-        m.version = i.readItem('H')
-
-        
-        
-        addr = self.transport.getPeer()
-
-        self.module_name = module_name
-
-        log.msg("Got mod_init for %s:%d from %s:%d" % (module_name, module_version, addr.host, addr.port))
-
-        self.factory.registerModule(module_name, module_version, self)
-
-    def on_module_event (self, i) :
-        event_type = i.readVarLen('B')
-        event_msg = i.readVarLen('B')
-
-        log.msg("Got mod_event of type %s" % (event_type, ))
-
-        self.factory.handleEvent(self, event_type, event_msg)
+import irc, api
 
 class IRCFactory (protocol.ClientFactory) :
-    protocol = IRCBot
+    protocol = irc.BotProtocol
 
+    def __init__ (self, nexus) :
+        self.nexus = nexus
+        
+        log.msg("Connection to IRC at %s:%d" % (irc.HOSTNAME, irc.PORT))
+        reactor.connectTCP(irc.HOSTNAME, irc.PORT, self)
+
+        self.connection = None
+    
+    def connected (self, connection) :
+        self.connection = connection
+
+class APIFactory (protocol.ServerFactory) :
+    protocol = api.ServerProtocol
+
+    def __init__ (self, nexus) :
+        self.nexus = nexus
+        
+        log.msg("API listening on %s:%d" % (api.SERVER_HOST, api.PORT))
+        reactor.listenTCP(api.PORT, self, interface=api.SERVER_HOST)
+
+class Nexus (object) :
     def __init__ (self) :
+        self.modules = dict()
 
-class APIFactory (protocol.ServerFactory
-    protocol = APIProtocol
+        self.irc = IRCFactory(self)
+        self.api = APIFactory(self)
+
+    def registerModule (self, module, transport) :
+        self.modules[module.name] = (module, transport)
+
+        self.irc.connection.moduleConnected(module, transport.transport.getPeer())
+
+    def unregisterModule (self, module, reason) :
+        del self.modules[module.name]
+
+        self.irc.connection.moduleDisconnected(module, reason)
+    
+    def handleEvent (self, event) :
+        self.irc.connection.sendEvent(event)
+        
+if __name__ == '__main__' :
+    log.startLogging(sys.stderr)
+
+    nexus = Nexus()
+    reactor.run()
+