terom@40: import unittest terom@40: terom@40: from qmsk.net.lib.event2.base import event_base terom@40: from qmsk.net.lib.event2.event import event terom@40: from qmsk.net.lib.event2.constants import * terom@40: terom@40: import os terom@40: terom@40: class MyEvent (event) : terom@40: def __init__ (self, base, fd, ev) : terom@40: event.__init__(self, base, fd, ev) terom@40: terom@40: self.ev_in = ev terom@40: self.ev_out = None terom@40: terom@40: self.res = None terom@40: terom@40: def __call__ (self, fd, mask) : terom@40: self.ev_out = mask terom@40: terom@40: self.res = (mask == self.ev_in) terom@40: terom@40: class TestEventBase (unittest.TestCase) : terom@40: """ terom@40: Simple event_base bits terom@40: """ terom@40: terom@40: def setUp (self) : terom@40: self.ev_base = event_base() terom@40: terom@40: def test_loop_empty (self) : terom@40: """ terom@40: loop() on an empty event_base returns False and doesn't block terom@40: """ terom@40: terom@40: self.assertFalse(self.ev_base.loop(nonblock=False, once=False)) terom@40: terom@40: def test_loop_empty_nb (self) : terom@40: """ terom@40: loop(nonblock=False) on an empty event_base returns False terom@40: """ terom@40: terom@40: self.assertFalse(self.ev_base.loop(nonblock=True)) terom@40: terom@42: class TestEvent (unittest.TestCase) : terom@42: """ terom@42: Simple event bits. terom@42: """ terom@42: terom@42: def setUp (self) : terom@42: self.ev_base = event_base() terom@42: self.ev = event(self.ev_base, 0, EV_READ) terom@42: terom@42: def test_add (self) : terom@42: """ terom@42: .add() works. terom@42: """ terom@42: terom@42: self.assertFalse(self.ev.pending(EV_READ)) terom@42: terom@42: self.ev.add() terom@42: terom@42: self.assertTrue(self.ev.pending(EV_READ)) terom@42: terom@42: def test_add_timeout (self) : terom@42: """ terom@42: .add() with timeout works terom@42: """ terom@42: terom@42: self.ev.add(1.1) terom@42: self.assertTrue(self.ev.pending()) terom@42: terom@42: def test_add_del (self) : terom@42: """ terom@42: .add() and then .delete() works terom@42: """ terom@42: terom@42: self.assertFalse(self.ev.pending()) terom@42: terom@42: # add it in terom@42: self.ev.add() terom@42: terom@42: self.assertTrue(self.ev.pending()) terom@42: terom@42: # ensure that the ev_base is indeed not empty terom@42: self.assertTrue(self.ev_base.loop(nonblock=True)) terom@42: terom@42: terom@42: # remove it again terom@42: self.ev.delete() terom@42: terom@42: self.assertFalse(self.ev.pending()) terom@42: terom@42: # ensure that the ev_base is indeed empty terom@42: self.assertFalse(self.ev_base.loop(nonblock=True)) terom@42: terom@42: def test_invalid_base (self) : terom@42: """ terom@42: Building an event with invalid base (None) fails terom@42: """ terom@42: terom@42: # XXX: this doesn't actually seem to check against None, just the type..? terom@42: self.assertRaises(TypeError, event, None, 0, EV_READ) terom@42: terom@42: def test_illegal_fd (self) : terom@42: """ terom@42: Building an event with an illegal fd (-1) fails terom@42: """ terom@42: terom@42: self.assertRaises(ValueError, event, self.ev_base, -1, EV_READ) terom@42: terom@42: def test_invalid_fd (self) : terom@42: """ terom@42: Building an event with an invalid fd (>= 0) fails when calling .add() terom@42: """ terom@42: terom@42: ev = event(self.ev_base, 666, EV_READ) terom@42: terom@42: # XXX: be more specific about Exception terom@42: self.assertRaises(Exception, ev.add) terom@42: terom@42: def test_invalid_mask_zero (self) : terom@42: """ terom@42: Building an event with an invalid mask (0)... XXX: works? terom@42: """ terom@42: terom@42: ev = event(self.ev_base, 0, 0) terom@42: ev.add() terom@42: terom@40: class TestEventPipe (unittest.TestCase) : terom@40: """ terom@40: Test basic event stuff using a pipe. terom@40: """ terom@40: terom@40: def setUp (self) : terom@40: self.ev_base = event_base() terom@40: self.fd_read, self.fd_write = os.pipe() terom@40: terom@40: def test_event (self) : terom@40: """ terom@40: Constructing an event works terom@40: """ terom@40: terom@40: self.assertTrue(event(self.ev_base, self.fd_read, EV_READ)) terom@40: terom@40: def test_read (self) : terom@40: """ terom@40: Read event on pipe with data fires terom@40: """ terom@40: terom@40: ev = MyEvent(self.ev_base, self.fd_read, EV_READ) terom@40: ev.add(0.1) # 100ms terom@40: terom@40: # trigger terom@40: os.write(self.fd_write, "foo") terom@40: terom@40: # loop terom@40: self.ev_base.loop(nonblock=True) terom@40: terom@40: # test terom@40: self.assertEquals(ev.ev_out, EV_READ) terom@40: terom@40: def test_read_timeout (self) : terom@40: """ terom@40: Read event on empty pipe timeouts terom@40: """ terom@40: terom@40: ev = MyEvent(self.ev_base, self.fd_read, EV_READ) terom@40: ev.add(0.0) terom@40: terom@40: # loop terom@40: self.ev_base.loop(once=True) terom@40: terom@40: # test terom@40: self.assertEquals(ev.ev_out, EV_TIMEOUT) terom@40: terom@40: def test_read_nonblock (self) : terom@40: """ terom@40: Read event on empty pipe doesn't block loop with nonblock=True terom@40: """ terom@40: terom@40: ev = MyEvent(self.ev_base, self.fd_read, EV_READ) terom@40: ev.add() terom@40: terom@40: # loop once terom@40: self.ev_base.loop(nonblock=True) terom@40: terom@40: # test terom@40: self.assertEquals(ev.ev_out, None) terom@42: terom@40: if __name__ == '__main__' : terom@40: unittest.main()