some lib.event2 unitests
authorTero Marttila <terom@fixme.fi>
Mon, 31 Aug 2009 21:34:54 +0300
changeset 40 a5d498bbf40a
parent 39 075eaafa80a7
child 41 02f7c0539843
some lib.event2 unitests
test/lib_event.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/lib_event.py	Mon Aug 31 21:34:54 2009 +0300
@@ -0,0 +1,108 @@
+import unittest
+
+from qmsk.net.lib.event2.base import event_base
+from qmsk.net.lib.event2.event import event
+from qmsk.net.lib.event2.constants import *
+
+import os
+
+class MyEvent (event) :
+    def __init__ (self, base, fd, ev) :
+        event.__init__(self, base, fd, ev)
+
+        self.ev_in = ev
+        self.ev_out = None
+
+        self.res = None
+
+    def __call__ (self, fd, mask) :
+        self.ev_out = mask
+
+        self.res = (mask == self.ev_in)
+
+class TestEventBase (unittest.TestCase) :
+    """
+        Simple event_base bits
+    """
+    
+    def setUp (self) :
+        self.ev_base = event_base()
+
+    def test_loop_empty (self) :
+        """
+            loop() on an empty event_base returns False and doesn't block
+        """
+
+        self.assertFalse(self.ev_base.loop(nonblock=False, once=False))
+
+    def test_loop_empty_nb (self) :
+        """
+            loop(nonblock=False) on an empty event_base returns False
+        """
+
+        self.assertFalse(self.ev_base.loop(nonblock=True))
+
+ 
+class TestEventPipe (unittest.TestCase) :
+    """
+        Test basic event stuff using a pipe.
+    """
+
+    def setUp (self) :
+        self.ev_base = event_base()
+        self.fd_read, self.fd_write = os.pipe()
+
+    def test_event (self) :
+        """
+            Constructing an event works
+        """
+
+        self.assertTrue(event(self.ev_base, self.fd_read, EV_READ))
+
+    def test_read (self) :
+        """
+            Read event on pipe with data fires
+        """
+
+        ev = MyEvent(self.ev_base, self.fd_read, EV_READ)
+        ev.add(0.1) # 100ms
+
+        # trigger
+        os.write(self.fd_write, "foo")
+
+        # loop
+        self.ev_base.loop(nonblock=True)
+
+        # test
+        self.assertEquals(ev.ev_out, EV_READ)
+
+    def test_read_timeout (self) :
+        """
+            Read event on empty pipe timeouts
+        """
+
+        ev = MyEvent(self.ev_base, self.fd_read, EV_READ)
+        ev.add(0.0)
+
+        # loop
+        self.ev_base.loop(once=True)
+
+        # test
+        self.assertEquals(ev.ev_out, EV_TIMEOUT)
+
+    def test_read_nonblock (self) :
+        """
+            Read event on empty pipe doesn't block loop with nonblock=True
+        """
+
+        ev = MyEvent(self.ev_base, self.fd_read, EV_READ)
+        ev.add()
+
+        # loop once
+        self.ev_base.loop(nonblock=True)
+
+        # test
+        self.assertEquals(ev.ev_out, None)
+
+if __name__ == '__main__' :
+    unittest.main()