qmsk/net/transport/tcp.py
author Tero Marttila <terom@fixme.fi>
Mon, 31 Aug 2009 22:19:16 +0300
branchconnect-async
changeset 45 bb49bf8222ed
parent 37 14db3fe42b6c
permissions -rw-r--r--
initial async connect attempt
"""
    TCP service/client implementation.
"""

from qmsk.net.transport import service, client, stream, socket

# default backlog for listen()
# XXX: number pulled out of a hat
LISTEN_BACKLOG = 5

class Connection (socket.Stream, stream.Stream) :
    """
        Stream interface for a TCP connection
    """

    def __init__ (self, sock) :
        """
            Initialize with the given already-existing, connected, socket.
        """

        self._init_sock(sock)
    
    def shutdown (self, how) :
        """
            Selectively shut-down parts of all of the full-duplex TCP connection.

                how             - one of socket.SHUT_* to shutdown read, write or both.
        """

        self.sock.shutdown(how)

class Service (socket.Service, service.Service) :
    """
        An implementation of Service for TCP sockets.
    """
    
    _SOCKTYPE = socket.SOCK_STREAM

    def __init__ (self, endpoint, listen_backlog=LISTEN_BACKLOG, family=None) :
        """
            Construct a service, bound to the given local endpoint and listening for incoming connections using the
            given backlog.

                endpoint        - local Endpoint to bind() to. Usually, it is enough to just specify the port.
                listen_backlog  - backlog length argument to use for socket.listen()
                family          - (optional) address family to use if no endpoint is given
            
            Note that as a special case, it is possible to construct a service without an Endpoint (i.e. None).
            In this case, there will be no socket.bind() call, instead, a socket is created with the given address
            family (which *MUST* be given), and .listen() causes the OS to pick a local address to use.

            This will raise an error if the bind() or listen() operations fail.
        """
        
        # construct a suitable socket bound to the given endpoint
        self._init_endpoint(endpoint, family=family)

        # make us listen
        self._listen(listen_backlog)

        # ok, great

    def accept (self, cls=Connection) :
        """
            Perform an accept() operation on our socket, returning a tcp.Connection.
        """
        
        # XXX: trap and raise a ServiceAcceptError?
        # XXX: what to do with addr?
        sock, addr = self.sock.accept()
        
        # construct the new Stream
        return cls(sock)

    def close (self) :
        """
            Close the underlying socket object, invalidating this Service for future use.

            This will raise if the underlying socket.close() operation does.
        """

        self.sock.close()

class Connector (socket.Connect) :
    """
        The connect() state machine.
    """

    def __init__ (self, bind_endpoint=None, family=None) :
        # bind()?
        if bind_endpoint or family :
            # construct a socket as defined
            self._init_endpoint(bind_endpoint, family=family)
 
    def start (self, endpoint) :
        """
            Start connecting to the given endpoint
        """

        # connect()-time errors
        self._errors = []
        
        # resolve the list of addresses to try and connect to
        self._ais = self._resolve_endpoint(endpoint)
    
    def cleanup (self) :
        """
            Destroy any used state
        """

        del self._errors
        del self._ais

    def build_error (self) :
        """
            Build and return an error object for this connect operation, and cleanup
        """

        error = socket.SocketConnectEndpointError(self._connect_endpoint, self._connect_errors)
        
        self.cleanup()

        return error

    def operate (self, nonblocking=True) :
        """
            Try and connect to each of our AddrInfo's in turn, collecting any errors, until we either run out of
            addresses to try, or we manage to connect().

                nonblocking - perform non-blocking connect()'s, so put the socket into non-blocking mode and treat
                              EINPROGRESS as a succesful connect().

            Returns True if we managed to connect, otherwise False.
        """
        
        # get next addrinfo to try
        for ai in self._connect_ais :
            try :
                if self.sock :
                    # try and connect the existing socket
                    self._connect_sock_addrinfo(self.sock, ai, nonblocking)
                
                else :
                    # create a new socket and connect it
                    self.sock = self._connect_addrinfo(ai, nonblocking)
            
            except SocketConnectAddrinfoError, error :
                # log it
                self._errors.append(error)

                # try the next one
                continue
            
            else :
                # yay!
                return True

        else :
            # unable to connect anywhere, nothing left to try
            return False
    
    def next (self) :
        """
            Operate asynchronously, 
        """

        if not self.operate() :
            # faail
            pass
        

    def connect (self, endpoint) :
        """
            Operate synchronously, either raising an error, or returning a socket.
        """
        
        # init
        self.start(endpoint)

        if not self.operate() :
            # nay :(
            raise self.build_error()

        else :
            # yay :)
            return self.sock
        
class Client (client.Client) :
    """
        An implementation of Client for TCP sockets.
    """

    _SOCKTYPE = socket.SOCK_STREAM

    def __init__ (self, connect_endpoint, bind_endpoint=None, family=None) :
        """
            Construct a client, connecting to the given remote endpoint.

                connect_endpoint    - remote Endpoint to connect() to.
                bind_endpoint       - (optional) local Endpoint to bind() to before connecting.
                family              - (optional) family to create sockaddr for
        """

        # store
        self._connect_endpoint = connect_endpoint
        

   
   
  
   
    def _connect_next (self) :
        """
            Driver for the async connect process
        """

        if self._connect(async=True) :
            # yay :)
            self._connect_deinit()

            self.on_connect()

        else :
            # nay :(
            self.on_error(self._connect_error())

    def connect (self, cls=Connection) :
        """
            Perform a synchronous connect() operation.
        """

        connector = Connector(self.bind_endpoint, self.family)

        sock = connector.connect(self.connect_endpoint)

        return cls(sock)

    def connect_async (self, reactor=None) :
        """
            Perform an asynchronous connect() operation, returning a Connector object.
        """

        connector = Connector(self.bind_endpoint, self.family)

        connector.start(self.connect_endpoint)

        return connector

    def on_connect (self) :
        """
            Succesfully connected.
        """

        pass

    def on_error (self, error) :
        """
            Connection failed.
        """
        
        pass