test/lib_event.py
author Tero Marttila <terom@fixme.fi>
Mon, 31 Aug 2009 21:34:54 +0300
changeset 40 a5d498bbf40a
child 42 0ff56f7216ee
permissions -rw-r--r--
some lib.event2 unitests
import unittest

from qmsk.net.lib.event2.base import event_base
from qmsk.net.lib.event2.event import event
from qmsk.net.lib.event2.constants import *

import os

class MyEvent (event) :
    def __init__ (self, base, fd, ev) :
        event.__init__(self, base, fd, ev)

        self.ev_in = ev
        self.ev_out = None

        self.res = None

    def __call__ (self, fd, mask) :
        self.ev_out = mask

        self.res = (mask == self.ev_in)

class TestEventBase (unittest.TestCase) :
    """
        Simple event_base bits
    """
    
    def setUp (self) :
        self.ev_base = event_base()

    def test_loop_empty (self) :
        """
            loop() on an empty event_base returns False and doesn't block
        """

        self.assertFalse(self.ev_base.loop(nonblock=False, once=False))

    def test_loop_empty_nb (self) :
        """
            loop(nonblock=False) on an empty event_base returns False
        """

        self.assertFalse(self.ev_base.loop(nonblock=True))

 
class TestEventPipe (unittest.TestCase) :
    """
        Test basic event stuff using a pipe.
    """

    def setUp (self) :
        self.ev_base = event_base()
        self.fd_read, self.fd_write = os.pipe()

    def test_event (self) :
        """
            Constructing an event works
        """

        self.assertTrue(event(self.ev_base, self.fd_read, EV_READ))

    def test_read (self) :
        """
            Read event on pipe with data fires
        """

        ev = MyEvent(self.ev_base, self.fd_read, EV_READ)
        ev.add(0.1) # 100ms

        # trigger
        os.write(self.fd_write, "foo")

        # loop
        self.ev_base.loop(nonblock=True)

        # test
        self.assertEquals(ev.ev_out, EV_READ)

    def test_read_timeout (self) :
        """
            Read event on empty pipe timeouts
        """

        ev = MyEvent(self.ev_base, self.fd_read, EV_READ)
        ev.add(0.0)

        # loop
        self.ev_base.loop(once=True)

        # test
        self.assertEquals(ev.ev_out, EV_TIMEOUT)

    def test_read_nonblock (self) :
        """
            Read event on empty pipe doesn't block loop with nonblock=True
        """

        ev = MyEvent(self.ev_base, self.fd_read, EV_READ)
        ev.add()

        # loop once
        self.ev_base.loop(nonblock=True)

        # test
        self.assertEquals(ev.ev_out, None)

if __name__ == '__main__' :
    unittest.main()