author | Tero Marttila <terom@fixme.fi> |
Sun, 23 Aug 2009 22:59:40 +0300 | |
changeset 37 | 14db3fe42b6c |
parent 36 | 4d5c02fe9c27 |
child 38 | f0fc793a3754 |
permissions | -rw-r--r-- |
34 | 1 |
import unittest |
2 |
||
36
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
3 |
from qmsk.net.socket import socket as _socket, af_inet, af_inet6 |
34 | 4 |
from qmsk.net.transport import tcp, socket, endpoint |
5 |
||
6 |
class TestService (unittest.TestCase) : |
|
7 |
def setUp (self) : |
|
8 |
# create service on random port |
|
37
14db3fe42b6c
move address-family from tcp/socket interface to endpoint interface. The address family of a socket is strictly a property of the address passed to it
Tero Marttila <terom@fixme.fi>
parents:
36
diff
changeset
|
9 |
self.ss = tcp.Service(None, family=socket.AF_INET) |
34 | 10 |
|
11 |
self.addr = self.ss.sock.getsockname() |
|
12 |
||
13 |
def test_bind_addr (self) : |
|
14 |
# ensure it binds on the right AF |
|
15 |
self.assertEquals(self.addr.family, socket.AF_INET) |
|
16 |
||
17 |
def test_connect_accept (self) : |
|
18 |
# ensure we can connect to it... |
|
19 |
sock = _socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
20 |
sock.connect(self.addr) |
|
21 |
||
22 |
# and accept |
|
23 |
sc = self.ss.accept() |
|
24 |
||
25 |
# test both ways |
|
26 |
self.assertEquals(sock.getpeername().port, self.addr.port) |
|
27 |
self.assertEquals(sc.sock.getpeername().port, sock.getsockname().port) |
|
28 |
||
29 |
class TestClient (unittest.TestCase) : |
|
30 |
def setUp (self) : |
|
31 |
# create listening socket on random port |
|
32 |
self.ls = _socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
33 |
self.ls.listen(1) |
|
34 |
||
35 |
self.sockaddr = af_inet.sockaddr_in('127.0.0.1', self.ls.getsockname().port) |
|
36 |
self.addr = endpoint.SockAddr(self.sockaddr) |
|
37 |
||
37
14db3fe42b6c
move address-family from tcp/socket interface to endpoint interface. The address family of a socket is strictly a property of the address passed to it
Tero Marttila <terom@fixme.fi>
parents:
36
diff
changeset
|
38 |
def test_connect (self) : |
34 | 39 |
cc = tcp.Client(self.addr) |
40 |
cs = cc.connect() |
|
41 |
||
42 |
self.assertEquals(cs.sock.getpeername(), self.sockaddr) |
|
43 |
||
44 |
def test_connect_bind (self) : |
|
45 |
sockaddr = af_inet.sockaddr_in('127.0.0.1', self.sockaddr.port + 1) |
|
46 |
||
47 |
# connect with bind() |
|
48 |
cc = tcp.Client(self.addr, bind_endpoint=endpoint.SockAddr(sockaddr)) |
|
49 |
cs = cc.connect() |
|
50 |
||
51 |
self.assertEquals(cs.sock.getsockname(), sockaddr) |
|
52 |
self.assertEquals(cs.sock.getpeername(), self.sockaddr) |
|
36
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
53 |
|
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
54 |
def test_connect_bind6 (self) : |
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
55 |
sockaddr = af_inet6.sockaddr_in6('::1', self.sockaddr.port + 1) |
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
56 |
|
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
57 |
# connect with bind() |
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
58 |
cc = tcp.Client(self.addr, bind_endpoint=endpoint.SockAddr(sockaddr)) |
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
59 |
|
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
60 |
# should fail (IPv6 -> IPv4) |
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
61 |
self.assertRaises(socket.SocketConnectEndpointError, cc.connect) |
34 | 62 |
|
35
50dc1517f797
fix endpoint.InetAddr.resolve
Tero Marttila <terom@fixme.fi>
parents:
34
diff
changeset
|
63 |
def test_connect_inet (self) : |
37
14db3fe42b6c
move address-family from tcp/socket interface to endpoint interface. The address family of a socket is strictly a property of the address passed to it
Tero Marttila <terom@fixme.fi>
parents:
36
diff
changeset
|
64 |
cc = tcp.Client(endpoint.InetAddr('localhost', self.sockaddr.port)) |
35
50dc1517f797
fix endpoint.InetAddr.resolve
Tero Marttila <terom@fixme.fi>
parents:
34
diff
changeset
|
65 |
cs = cc.connect() |
50dc1517f797
fix endpoint.InetAddr.resolve
Tero Marttila <terom@fixme.fi>
parents:
34
diff
changeset
|
66 |
|
50dc1517f797
fix endpoint.InetAddr.resolve
Tero Marttila <terom@fixme.fi>
parents:
34
diff
changeset
|
67 |
self.assertEquals(cs.sock.getpeername(), self.sockaddr) |
50dc1517f797
fix endpoint.InetAddr.resolve
Tero Marttila <terom@fixme.fi>
parents:
34
diff
changeset
|
68 |
|
36
4d5c02fe9c27
add test for IPv6 -> IPv4 connect failure
Tero Marttila <terom@fixme.fi>
parents:
35
diff
changeset
|
69 |
|
34 | 70 |
if __name__ == '__main__' : |
71 |
unittest.main() |
|
72 |