initial async connect attempt connect-async
authorTero Marttila <terom@fixme.fi>
Mon, 31 Aug 2009 22:19:16 +0300
branchconnect-async
changeset 45 bb49bf8222ed
parent 44 01ac7755b15a
initial async connect attempt
qmsk/net/transport/client.py
qmsk/net/transport/socket.py
qmsk/net/transport/tcp.py
--- a/qmsk/net/transport/client.py	Mon Aug 31 22:17:54 2009 +0300
+++ b/qmsk/net/transport/client.py	Mon Aug 31 22:19:16 2009 +0300
@@ -30,3 +30,6 @@
 
         raise NotImplementedError()
 
+    def connect_async (self, cls=None, reactor=None) :
+        raise NotImplementedError()
+
--- a/qmsk/net/transport/socket.py	Mon Aug 31 22:17:54 2009 +0300
+++ b/qmsk/net/transport/socket.py	Mon Aug 31 22:19:16 2009 +0300
@@ -5,6 +5,8 @@
 from qmsk.net.socket import socket
 from qmsk.net.socket.constants import *
 
+import errno
+
 class SocketError (Exception) :
     """
         Base class of errors raised by the Socket classes.
@@ -62,7 +64,21 @@
         """
 
         self.sock = sock
+    
+    def _set_nonblocking (self, nonblocking) :
+        """
+            Set/unset non-blocking I/O mode.
+        """
+        
+        # XXX: implement elsewhere
+        import fcntl, os
 
+        flags = 0
+
+        if nonblocking :
+            flags |= os.O_NONBLOCK
+
+        fcntl.fcntl(self.sock, fcntl.F_SETFL, flags)
 
 class Common (Base) :
     """
@@ -104,6 +120,19 @@
            
         else :
             return sock
+    
+    @classmethod
+    def _resolve_endpoint (cls, endpoint, socktype = None, protocol = 0, flags = 0) :
+        """
+            Yield an *iterator* of AddrInfo objects by resolving the given endpoint endpoint using parameters suitable
+            for this socket.
+        """
+
+        if socktype is None :
+            socktype = cls._SOCKTYPE
+        
+        for ai in endpoint.resolve(socktype, protocol, flags) :
+            yield ai
 
     @classmethod
     def _bind_endpoint (cls, endpoint, socktype = None, protocol=0) :
@@ -121,11 +150,8 @@
         
         errors = []
         
-        if socktype is None :
-            socktype = cls._SOCKTYPE
-        
         # resolve the endpoint and try socket+bind
-        for ai in endpoint.resolve(socktype, protocol, AI_PASSIVE) :
+        for ai in cls._resolve_endpoint(endpoint, socktype, protocol, AI_PASSIVE) :
             try :
                 # try to socket+bind this addrinfo
                 sock = cls._bind_addrinfo(ai)
@@ -153,6 +179,8 @@
         """
         
         if endpoint is not None :
+            assert not family
+
             # create a suitable socket bound to a the given endpoint
             self.sock = self._bind_endpoint(endpoint, socktype, protocol)
         
@@ -212,7 +240,7 @@
         ))
 
 
-class Client (Common) :
+class Connect (Common) :
     """
         Connecting socket
     """
@@ -222,13 +250,15 @@
         """
             Attempt to connect the given socket to the given address.
         """
-
+        
         sock.connect(addr)
 
     @classmethod
-    def _connect_sock_addrinfo (cls, sock, ai) :
+    def _connect_sock_addrinfo (cls, sock, ai, nonblocking=False) :
         """
             Attempt to connect the given socket to the given addrinfo's address.
+                
+                nonblocking - treat EINPROGRESS as success
         """
 
         try :
@@ -236,10 +266,14 @@
 
         # XXX: except socket.error as e :
         except OSError, error :
-            raise SocketConnectAddrinfoError(ai, error)
+            if nonblocking and error.errno == errno.EINPROGRESS :
+                return
+
+            else :
+                raise SocketConnectAddrinfoError(ai, error)
 
     @classmethod
-    def _connect_addrinfo (cls, ai) :
+    def _connect_addrinfo (cls, ai, nonblocking=False) :
         """
             Attempt to create a socket and connect it based on the given addrinfo, returning the new socket is succesfull.
         """
@@ -252,9 +286,8 @@
         except OSError, error : 
             raise SocketConnectAddrinfoError(ai, error)
 
-
         # try and connect() it
-        cls._connect_sock_addrinfo(sock, ai)
+        cls._connect_sock_addrinfo(sock, ai, nonblocking)
         
         # return once succesfully
         return sock
@@ -267,11 +300,8 @@
         
         errors = []
         
-        if socktype is None :
-            socktype = cls._SOCKTYPE
-
         # resolve the endpoint and try socket+bind
-        for ai in endpoint.resolve(socktype, protocol) :
+        for ai in cls._resolve_endpoint(endpoint, socktype, protocol) :
             try :
                 # try to connect the socket to this addrinfo
                 cls._connect_sock_addrinfo(sock, ai)
@@ -300,11 +330,8 @@
 
         errors = []
         
-        if socktype is None :
-            socktype = cls._SOCKTYPE
-
         # resolve the endpoint and try socket+bind
-        for ai in endpoint.resolve(socktype, protocol) :
+        for ai in cls._resolve_endpoint(endpoint, socktype, protocol) :
             try :
                 # try to socket+connect this addrinfo
                 sock = cls._connect_addrinfo(ai)
@@ -337,13 +364,13 @@
             and to the IPv4 address using an IPv4 socket.
         """
 
-        if self.socket :
+        if self.sock :
             # connect with existing socket
-            self._connect_sock_endpoint(self.socket, endpoint, socktype, protocol)
+            self._connect_sock_endpoint(self.sock, endpoint, socktype, protocol)
 
         else :
             # connect with new socket
-            self._connect_endpoint(endpoint, socktype, protocol)
+            self.sock = self._connect_endpoint(endpoint, socktype, protocol)
 
 class Stream (Base) :
     """
--- a/qmsk/net/transport/tcp.py	Mon Aug 31 22:17:54 2009 +0300
+++ b/qmsk/net/transport/tcp.py	Mon Aug 31 22:19:16 2009 +0300
@@ -81,43 +81,182 @@
 
         self.sock.close()
 
+class Connector (socket.Connect) :
+    """
+        The connect() state machine.
+    """
 
-class Client (socket.Client, client.Client) :
+    def __init__ (self, bind_endpoint=None, family=None) :
+        # bind()?
+        if bind_endpoint or family :
+            # construct a socket as defined
+            self._init_endpoint(bind_endpoint, family=family)
+ 
+    def start (self, endpoint) :
+        """
+            Start connecting to the given endpoint
+        """
+
+        # connect()-time errors
+        self._errors = []
+        
+        # resolve the list of addresses to try and connect to
+        self._ais = self._resolve_endpoint(endpoint)
+    
+    def cleanup (self) :
+        """
+            Destroy any used state
+        """
+
+        del self._errors
+        del self._ais
+
+    def build_error (self) :
+        """
+            Build and return an error object for this connect operation, and cleanup
+        """
+
+        error = socket.SocketConnectEndpointError(self._connect_endpoint, self._connect_errors)
+        
+        self.cleanup()
+
+        return error
+
+    def operate (self, nonblocking=True) :
+        """
+            Try and connect to each of our AddrInfo's in turn, collecting any errors, until we either run out of
+            addresses to try, or we manage to connect().
+
+                nonblocking - perform non-blocking connect()'s, so put the socket into non-blocking mode and treat
+                              EINPROGRESS as a succesful connect().
+
+            Returns True if we managed to connect, otherwise False.
+        """
+        
+        # get next addrinfo to try
+        for ai in self._connect_ais :
+            try :
+                if self.sock :
+                    # try and connect the existing socket
+                    self._connect_sock_addrinfo(self.sock, ai, nonblocking)
+                
+                else :
+                    # create a new socket and connect it
+                    self.sock = self._connect_addrinfo(ai, nonblocking)
+            
+            except SocketConnectAddrinfoError, error :
+                # log it
+                self._errors.append(error)
+
+                # try the next one
+                continue
+            
+            else :
+                # yay!
+                return True
+
+        else :
+            # unable to connect anywhere, nothing left to try
+            return False
+    
+    def next (self) :
+        """
+            Operate asynchronously, 
+        """
+
+        if not self.operate() :
+            # faail
+            pass
+        
+
+    def connect (self, endpoint) :
+        """
+            Operate synchronously, either raising an error, or returning a socket.
+        """
+        
+        # init
+        self.start(endpoint)
+
+        if not self.operate() :
+            # nay :(
+            raise self.build_error()
+
+        else :
+            # yay :)
+            return self.sock
+        
+class Client (client.Client) :
     """
         An implementation of Client for TCP sockets.
     """
 
     _SOCKTYPE = socket.SOCK_STREAM
 
-    def __init__ (self, connect_endpoint, bind_endpoint=None) :
+    def __init__ (self, connect_endpoint, bind_endpoint=None, family=None) :
         """
             Construct a client, connecting to the given remote endpoint.
 
                 connect_endpoint    - remote Endpoint to connect() to.
                 bind_endpoint       - (optional) local Endpoint to bind() to before connecting.
-
+                family              - (optional) family to create sockaddr for
         """
 
         # store
-        self.connect_endpoint = connect_endpoint
-        self.bind_endpoint = bind_endpoint
+        self._connect_endpoint = connect_endpoint
+        
+
+   
+   
+  
+   
+    def _connect_next (self) :
+        """
+            Driver for the async connect process
+        """
+
+        if self._connect(async=True) :
+            # yay :)
+            self._connect_deinit()
+
+            self.on_connect()
+
+        else :
+            # nay :(
+            self.on_error(self._connect_error())
 
     def connect (self, cls=Connection) :
         """
-            Perform the connect() operation, returning a tcp.Connection.
+            Perform a synchronous connect() operation.
         """
 
-        if self.bind_endpoint :
-            # construct a suitable local socket, bound to a specific endpoint
-            sock = self._bind_endpoint(self.bind_endpoint)
+        connector = Connector(self.bind_endpoint, self.family)
 
-            # connect it to the remote endpoint
-            self._connect_sock_endpoint(sock, self.connect_endpoint)
+        sock = connector.connect(self.connect_endpoint)
 
-        else :
-            # let _init_connect_endpoint pick a socket to use
-            sock = self._connect_endpoint(self.connect_endpoint)
-        
-        # construct
         return cls(sock)
 
+    def connect_async (self, reactor=None) :
+        """
+            Perform an asynchronous connect() operation, returning a Connector object.
+        """
+
+        connector = Connector(self.bind_endpoint, self.family)
+
+        connector.start(self.connect_endpoint)
+
+        return connector
+
+    def on_connect (self) :
+        """
+            Succesfully connected.
+        """
+
+        pass
+
+    def on_error (self, error) :
+        """
+            Connection failed.
+        """
+        
+        pass
+