--- a/qmsk/net/transport/client.py Mon Aug 31 22:17:54 2009 +0300
+++ b/qmsk/net/transport/client.py Mon Aug 31 22:19:16 2009 +0300
@@ -30,3 +30,6 @@
raise NotImplementedError()
+ def connect_async (self, cls=None, reactor=None) :
+ raise NotImplementedError()
+
--- a/qmsk/net/transport/socket.py Mon Aug 31 22:17:54 2009 +0300
+++ b/qmsk/net/transport/socket.py Mon Aug 31 22:19:16 2009 +0300
@@ -5,6 +5,8 @@
from qmsk.net.socket import socket
from qmsk.net.socket.constants import *
+import errno
+
class SocketError (Exception) :
"""
Base class of errors raised by the Socket classes.
@@ -62,7 +64,21 @@
"""
self.sock = sock
+
+ def _set_nonblocking (self, nonblocking) :
+ """
+ Set/unset non-blocking I/O mode.
+ """
+
+ # XXX: implement elsewhere
+ import fcntl, os
+ flags = 0
+
+ if nonblocking :
+ flags |= os.O_NONBLOCK
+
+ fcntl.fcntl(self.sock, fcntl.F_SETFL, flags)
class Common (Base) :
"""
@@ -104,6 +120,19 @@
else :
return sock
+
+ @classmethod
+ def _resolve_endpoint (cls, endpoint, socktype = None, protocol = 0, flags = 0) :
+ """
+ Yield an *iterator* of AddrInfo objects by resolving the given endpoint endpoint using parameters suitable
+ for this socket.
+ """
+
+ if socktype is None :
+ socktype = cls._SOCKTYPE
+
+ for ai in endpoint.resolve(socktype, protocol, flags) :
+ yield ai
@classmethod
def _bind_endpoint (cls, endpoint, socktype = None, protocol=0) :
@@ -121,11 +150,8 @@
errors = []
- if socktype is None :
- socktype = cls._SOCKTYPE
-
# resolve the endpoint and try socket+bind
- for ai in endpoint.resolve(socktype, protocol, AI_PASSIVE) :
+ for ai in cls._resolve_endpoint(endpoint, socktype, protocol, AI_PASSIVE) :
try :
# try to socket+bind this addrinfo
sock = cls._bind_addrinfo(ai)
@@ -153,6 +179,8 @@
"""
if endpoint is not None :
+ assert not family
+
# create a suitable socket bound to a the given endpoint
self.sock = self._bind_endpoint(endpoint, socktype, protocol)
@@ -212,7 +240,7 @@
))
-class Client (Common) :
+class Connect (Common) :
"""
Connecting socket
"""
@@ -222,13 +250,15 @@
"""
Attempt to connect the given socket to the given address.
"""
-
+
sock.connect(addr)
@classmethod
- def _connect_sock_addrinfo (cls, sock, ai) :
+ def _connect_sock_addrinfo (cls, sock, ai, nonblocking=False) :
"""
Attempt to connect the given socket to the given addrinfo's address.
+
+ nonblocking - treat EINPROGRESS as success
"""
try :
@@ -236,10 +266,14 @@
# XXX: except socket.error as e :
except OSError, error :
- raise SocketConnectAddrinfoError(ai, error)
+ if nonblocking and error.errno == errno.EINPROGRESS :
+ return
+
+ else :
+ raise SocketConnectAddrinfoError(ai, error)
@classmethod
- def _connect_addrinfo (cls, ai) :
+ def _connect_addrinfo (cls, ai, nonblocking=False) :
"""
Attempt to create a socket and connect it based on the given addrinfo, returning the new socket is succesfull.
"""
@@ -252,9 +286,8 @@
except OSError, error :
raise SocketConnectAddrinfoError(ai, error)
-
# try and connect() it
- cls._connect_sock_addrinfo(sock, ai)
+ cls._connect_sock_addrinfo(sock, ai, nonblocking)
# return once succesfully
return sock
@@ -267,11 +300,8 @@
errors = []
- if socktype is None :
- socktype = cls._SOCKTYPE
-
# resolve the endpoint and try socket+bind
- for ai in endpoint.resolve(socktype, protocol) :
+ for ai in cls._resolve_endpoint(endpoint, socktype, protocol) :
try :
# try to connect the socket to this addrinfo
cls._connect_sock_addrinfo(sock, ai)
@@ -300,11 +330,8 @@
errors = []
- if socktype is None :
- socktype = cls._SOCKTYPE
-
# resolve the endpoint and try socket+bind
- for ai in endpoint.resolve(socktype, protocol) :
+ for ai in cls._resolve_endpoint(endpoint, socktype, protocol) :
try :
# try to socket+connect this addrinfo
sock = cls._connect_addrinfo(ai)
@@ -337,13 +364,13 @@
and to the IPv4 address using an IPv4 socket.
"""
- if self.socket :
+ if self.sock :
# connect with existing socket
- self._connect_sock_endpoint(self.socket, endpoint, socktype, protocol)
+ self._connect_sock_endpoint(self.sock, endpoint, socktype, protocol)
else :
# connect with new socket
- self._connect_endpoint(endpoint, socktype, protocol)
+ self.sock = self._connect_endpoint(endpoint, socktype, protocol)
class Stream (Base) :
"""
--- a/qmsk/net/transport/tcp.py Mon Aug 31 22:17:54 2009 +0300
+++ b/qmsk/net/transport/tcp.py Mon Aug 31 22:19:16 2009 +0300
@@ -81,43 +81,182 @@
self.sock.close()
+class Connector (socket.Connect) :
+ """
+ The connect() state machine.
+ """
-class Client (socket.Client, client.Client) :
+ def __init__ (self, bind_endpoint=None, family=None) :
+ # bind()?
+ if bind_endpoint or family :
+ # construct a socket as defined
+ self._init_endpoint(bind_endpoint, family=family)
+
+ def start (self, endpoint) :
+ """
+ Start connecting to the given endpoint
+ """
+
+ # connect()-time errors
+ self._errors = []
+
+ # resolve the list of addresses to try and connect to
+ self._ais = self._resolve_endpoint(endpoint)
+
+ def cleanup (self) :
+ """
+ Destroy any used state
+ """
+
+ del self._errors
+ del self._ais
+
+ def build_error (self) :
+ """
+ Build and return an error object for this connect operation, and cleanup
+ """
+
+ error = socket.SocketConnectEndpointError(self._connect_endpoint, self._connect_errors)
+
+ self.cleanup()
+
+ return error
+
+ def operate (self, nonblocking=True) :
+ """
+ Try and connect to each of our AddrInfo's in turn, collecting any errors, until we either run out of
+ addresses to try, or we manage to connect().
+
+ nonblocking - perform non-blocking connect()'s, so put the socket into non-blocking mode and treat
+ EINPROGRESS as a succesful connect().
+
+ Returns True if we managed to connect, otherwise False.
+ """
+
+ # get next addrinfo to try
+ for ai in self._connect_ais :
+ try :
+ if self.sock :
+ # try and connect the existing socket
+ self._connect_sock_addrinfo(self.sock, ai, nonblocking)
+
+ else :
+ # create a new socket and connect it
+ self.sock = self._connect_addrinfo(ai, nonblocking)
+
+ except SocketConnectAddrinfoError, error :
+ # log it
+ self._errors.append(error)
+
+ # try the next one
+ continue
+
+ else :
+ # yay!
+ return True
+
+ else :
+ # unable to connect anywhere, nothing left to try
+ return False
+
+ def next (self) :
+ """
+ Operate asynchronously,
+ """
+
+ if not self.operate() :
+ # faail
+ pass
+
+
+ def connect (self, endpoint) :
+ """
+ Operate synchronously, either raising an error, or returning a socket.
+ """
+
+ # init
+ self.start(endpoint)
+
+ if not self.operate() :
+ # nay :(
+ raise self.build_error()
+
+ else :
+ # yay :)
+ return self.sock
+
+class Client (client.Client) :
"""
An implementation of Client for TCP sockets.
"""
_SOCKTYPE = socket.SOCK_STREAM
- def __init__ (self, connect_endpoint, bind_endpoint=None) :
+ def __init__ (self, connect_endpoint, bind_endpoint=None, family=None) :
"""
Construct a client, connecting to the given remote endpoint.
connect_endpoint - remote Endpoint to connect() to.
bind_endpoint - (optional) local Endpoint to bind() to before connecting.
-
+ family - (optional) family to create sockaddr for
"""
# store
- self.connect_endpoint = connect_endpoint
- self.bind_endpoint = bind_endpoint
+ self._connect_endpoint = connect_endpoint
+
+
+
+
+
+
+ def _connect_next (self) :
+ """
+ Driver for the async connect process
+ """
+
+ if self._connect(async=True) :
+ # yay :)
+ self._connect_deinit()
+
+ self.on_connect()
+
+ else :
+ # nay :(
+ self.on_error(self._connect_error())
def connect (self, cls=Connection) :
"""
- Perform the connect() operation, returning a tcp.Connection.
+ Perform a synchronous connect() operation.
"""
- if self.bind_endpoint :
- # construct a suitable local socket, bound to a specific endpoint
- sock = self._bind_endpoint(self.bind_endpoint)
+ connector = Connector(self.bind_endpoint, self.family)
- # connect it to the remote endpoint
- self._connect_sock_endpoint(sock, self.connect_endpoint)
+ sock = connector.connect(self.connect_endpoint)
- else :
- # let _init_connect_endpoint pick a socket to use
- sock = self._connect_endpoint(self.connect_endpoint)
-
- # construct
return cls(sock)
+ def connect_async (self, reactor=None) :
+ """
+ Perform an asynchronous connect() operation, returning a Connector object.
+ """
+
+ connector = Connector(self.bind_endpoint, self.family)
+
+ connector.start(self.connect_endpoint)
+
+ return connector
+
+ def on_connect (self) :
+ """
+ Succesfully connected.
+ """
+
+ pass
+
+ def on_error (self, error) :
+ """
+ Connection failed.
+ """
+
+ pass
+