[transport] initial TCP implementation
authorTero Marttila <terom@fixme.fi>
Fri, 21 Aug 2009 00:30:06 +0300
changeset 28 020c89baaa33
parent 27 12468e38227e
child 29 4e8adf792802
[transport] initial TCP implementation
qmsk/net/transport/__init__.py
qmsk/net/transport/client.py
qmsk/net/transport/service.py
qmsk/net/transport/socket.py
qmsk/net/transport/stream.py
qmsk/net/transport/tcp.py
qmsk/net/transport/transport.py
--- a/qmsk/net/transport/__init__.py	Fri Aug 21 00:29:25 2009 +0300
+++ b/qmsk/net/transport/__init__.py	Fri Aug 21 00:30:06 2009 +0300
@@ -1,7 +1,9 @@
 """
     Transport-layer functionality.
 
-    This implements TCP/UDP/SCTP functionality, plus relevant Transport-Application layer stuff like TLS, SOCKS, SSH-channels, etc.
+    This implements the core TCP/UDP/SCTP functionality, plus relevant transport/application layer stuff like TLS, SOCKS, SSH-channels, etc.
+
+    XXX: non-blocking interface?
 
     Firstly, this defines a set of abstract interfaces that model the actual implementation as built on top of the
     socket packages's network-layer implementation:
@@ -14,6 +16,8 @@
             ConnectedTransport      - connection-oriented transport
             
                 Stream                  - connection-oriented full-duplex byte stream (reliable, sequenced)
+                    
+                    BufferedStream          - buffered version of the above
     
                 PacketStream            - connection-oriented full-duplex packet-oriented communication
  
@@ -95,5 +99,10 @@
         Stream          - tcp.Stream carried over a SOCKSv5 connections
 
         Client          - tcp.Client connecting via a SOCKSv5 server
-        
+
+
+
+
+    TODO:
+        * implement the above
 """
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/net/transport/client.py	Fri Aug 21 00:30:06 2009 +0300
@@ -0,0 +1,32 @@
+"""
+    Abstract Client interface
+"""
+
+class Client (object) :
+    """
+        A Client establishes a connection to some remote Service.
+        
+        This is an abstract base class that is further impemented by e.g. TCP or SSL.
+
+        Note that a Client behaves more like a factory, such that the Client itself is not a Transport, one must instead
+        call .connect(), rather like one must call Service.accept().
+    """
+
+    def __init__ (self, endpoint) :
+        """
+            Connect to the given remote endpoint.
+
+                endpoint        - the remote Endpoint to connect to
+        """
+
+        raise NotImplementedError()
+
+    def connect (self, cls=None) :
+        """
+            Perform the connect operation, returning a new Transport.
+                
+                cls         - optional transport-specific type to use for the new connection.
+        """
+
+        raise NotImplementedError()
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/net/transport/service.py	Fri Aug 21 00:30:06 2009 +0300
@@ -0,0 +1,47 @@
+"""
+    Abstract Service interface
+"""
+
+class ServiceError (Exception) :
+    """
+        Service-related error.
+    """
+
+class Service (object) :
+    """
+        A Service listens for incoming connections, handing those connections off as separate objects.
+
+        This is an abstract base class that is further impemented by e.g. TCP or SSL.
+    """
+
+    def __init__ (self, endpoint) :
+        """
+            Listen on the given local endpoint.
+
+                endpoint    - the local Endpoint to provide this service on.
+        """
+
+        raise NotImplementedError()
+
+    def accept (self, cls=None) :
+        """
+            Accept and return a new Transport object representing a connected remote Client.
+
+                cls         - optional transport-specific type to use for the new connection.
+        """
+
+        raise NotImplementedError()
+    
+    def close (self) :
+        """
+            Explicitly shut down this Service, closing any underlying resources and refusing any new connections.
+
+            This will happen implicitly if the Service is destroyed, this is just the explicit interface.
+
+            This invalidates this Service object for future use.
+
+            This may, in theory, raise an error, which would be supressed if this Service were destroyed.
+        """
+        
+        raise NotImplementedError()
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/net/transport/socket.py	Fri Aug 21 00:30:06 2009 +0300
@@ -0,0 +1,359 @@
+"""
+    Socket implementation helpers
+"""
+
+from qmsk.net.socket import socket
+from qmsk.net.socket.constants import *
+
+class SocketError (Exception) :
+    """
+        Base class of errors raised by the Socket classes.
+    """
+
+class SocketBindAddrinfoError (SocketError) :
+    """
+        The socket was unable to socket()+bind() to the given addrinfo.
+    """
+
+    def __init__ (self, addrinfo, error) :
+        """
+            addrinfo            - the addrinfo we tried to use
+            error               - the resulting error
+        """
+
+        self.addrinfo = addrinfo
+        self.error = error
+
+    def __str__ (self) :
+        return "Unable to bind() to %s: %s" % (self.addrinfo, self.error)
+
+class SocketBindEndpointError (SocketError) :
+    """
+        The socket was unable to bind() to the any of the given endpoint's addresses.
+    """
+
+    def __init__ (self, endpoint, errors) :
+        """
+            endpoint            - the endpoint we tried to bind to
+            errors              - a sequence of ServiceBindAddrinfoErrors describing the failed bind attempts.
+        """
+
+        self.endpoint = endpoint
+        self.errors = errors
+
+    def __str__ (self) :
+        # XXX: too verbose?
+        return "Unable to bind() to any addresses on endpoint %s:\n%s" % (self.endpoint, "\n".join(
+            "\t%s" % (error, ) for error in self.errors
+        ))
+
+
+class Base (object) :
+    """
+        Base class for all other socket-related classes, contains the underlying socket object.
+    """
+
+    # the underlying socket object
+    sock = None
+    
+    def _init_sock (self, sock) :
+        """
+            Initialize with the given pre-existing socket.
+        """
+
+        self.sock = sock
+
+
+class Common (Base) :
+    """
+        Common operations for Client/Service
+    """
+
+    @classmethod
+    def _socket (self, family, socktype, protocol = 0) :
+        """
+            Construct and return a new socket object using the given parameters.
+        """
+
+        return socket.socket(family, socktype, protocol)
+
+    @classmethod
+    def _bind_addrinfo (cls, ai) :
+        """
+            This will attempt to create a new socket and bind it, based on the given addrinfo, returning the new socket.
+
+            Raises a ServiceBindAddrinfoError if this fails
+        """
+
+        try :
+            # socket()
+            sock = self._socket(ai.family, ai.socktype, ai.protocol)
+
+            # bind()
+            sock.bind(ai.addr)
+
+        # XXX: except socket.error as e :
+        except OSError, error : 
+            raise SocketBindAddrinfoError(ai, error)
+           
+        else :
+            return sock
+
+    @classmethod
+    def _bind_endpoint (cls, endpoint, family, socktype, protocol=0) :
+        """
+            This will resolve the given endpoint, and attempt to create and bind a suitable socket and return it.
+
+                endpoint        - local Endpoint to bind() to.
+                family          - socket address family to use.
+                socktype        - socket type to use
+                protocol        - (optional) specific protocol
+
+            Raises a ServiceBindError if this is unable to create a bound socket.
+
+            XXX: bind to all of the given endpoint's addresses instead of just one...?
+        """
+        
+        errors = []
+        
+        # resolve the endpoint and try socket+bind
+        for ai in endpoint.getaddrinfo(family, socktype, protocol, AI_PASSIVE) :
+            try :
+                # try to socket+bind this addrinfo
+                sock = cls._bind_addrinfo(ai)
+            
+            except SocketBindAddrinfoError, error :
+                # collect
+                errors.append(error)
+                
+                # keep trying
+                continue
+
+            else :
+                # got a working socket :)
+                return sock
+       
+        else :
+            # no suitable address found :(
+            raise SocketBindEndpointError(endpoint, errors)
+    
+    def _init_endpoint (self, endpoint, family, socktype, protocol = 0) :
+        """
+            Initialize this socket by constructing a new socket with the given parameters, bound to the given endpoint,
+            if given. If no endpoint is given, this simply creates a socket with the given settings and does not bind
+            it anywhere.
+        """
+
+        # create local socket
+        if endpoint :
+            # create a suitable socket bound to a the given endpoint
+            self.sock = self._bind_endpoint(endpoint, family, socktype, protocol)
+
+        else :
+            # create a suitable socket not bound to anything
+            self.sock = self._socket(family, socktype, protocol)
+
+
+class Service (Common) :
+    """
+        Listener socket
+    """
+
+    def _listen (self, backlog) :
+        """
+            Puts this socket into listen() mode with the given backlog.
+        """
+        
+        self.sock.listen(backlog)
+
+class SocketConnectAddrinfoError (SocketError) :
+    """
+        The socket was unable to socket()+connect() to the given addrinfo.
+    """
+
+    def __init__ (self, addrinfo, error) :
+        """
+            addrinfo            - the addrinfo we tried to use
+            error               - the resulting error
+        """
+
+        self.addrinfo = addrinfo
+        self.error = error
+
+    def __str__ (self) :
+        return "Unable to connect() to %s: %s" % (self.addrinfo, self.error)
+
+class SocketConnectEndpointError (SocketError) :
+    """
+        The socket was unable to connect() to the any of the given endpoint's addresses.
+    """
+
+    def __init__ (self, endpoint, errors) :
+        """
+            endpoint            - the endpoint we tried to connect to
+            errors              - a sequence of ServiceBindAddrinfoErrors describing the failed connect attempts.
+        """
+
+        self.endpoint = endpoint
+        self.errors = errors
+
+    def __str__ (self) :
+        # XXX: too verbose?
+        return "Unable to connect() to any addresses on endpoint %s:\n%s" % (self.endpoint, "\n".join(
+            "\t%s" % (error, ) for error in self.errors
+        ))
+
+
+class Client (Common) :
+    """
+        Connecting socket
+    """
+
+    @classmethod
+    def _connect_sock_addr (cls, sock, addr) :
+        """
+            Attempt to connect the given socket to the given address.
+        """
+
+        sock.connect(addr)
+
+    @classmethod
+    def _connect_sock_addrinfo (cls, sock, ai) :
+        """
+            Attempt to connect the given socket to the given addrinfo's address.
+        """
+
+        try :
+            cls._connect_sock_addr(sock, ai.addr)
+
+        # XXX: except socket.error as e :
+        except OSError, error :
+            raise SocketConnectAddrinfoError(ai, error)
+
+    @classmethod
+    def _connect_addrinfo (cls, ai) :
+        """
+            Attempt to create a socket and connect it based on the given addrinfo, returning the new socket is succesfull.
+        """
+
+        try :
+            # socket()
+            sock = cls._socket(ai.family, ai.socktype, ai.protocol)
+
+        # XXX: except socket.error as e :
+        except OSError, error : 
+            raise SocketConnectAddrinfoError(ai, error)
+
+
+        # try and connect() it
+        cls._connect_sock_addrinfo(sock, ai)
+        
+        # return once succesfully
+        return sock
+
+    @classmethod
+    def _connect_sock_endpoint (cls, sock, endpoint, family, socktype, protocol = 0) :
+        """
+            Connect this socket to the given remote endpoint, using the given parameters to resolve the endpoint.
+        """
+        
+        errors = []
+        
+        # resolve the endpoint and try socket+bind
+        for ai in endpoint.getaddrinfo(family, socktype, protocol) :
+            try :
+                # try to connect the socket to this addrinfo
+                cls._connect_sock_addrinfo(sock, ai)
+            
+            except SocketConnectAddrinfoError, error :
+                # collect
+                errors.append(error)
+                
+                # keep trying
+                continue
+
+            else :
+                # got a working socket :)
+                return
+
+        else :
+            # no suitable address found :(
+            raise SocketConnectEndpointError(endpoint, errors)
+    
+    @classmethod
+    def _connect_endpoint (self, endpoint, family, socktype, protocol = 0) :
+        """
+            Create a new socket and connect it to the given remote endpoint, using the given parameters to resolve the
+            endpoint.
+        """
+
+        errors = []
+        
+        # resolve the endpoint and try socket+bind
+        for ai in endpoint.getaddrinfo(family, socktype, protocol) :
+            try :
+                # try to socket+connect this addrinfo
+                sock = cls._connect_addrinfo(ai)
+            
+            except SocketConnectAddrinfoError, error :
+                # collect
+                errors.append(error)
+                
+                # keep trying
+                continue
+
+            else :
+                # got a working socket :)
+                return sock
+       
+        else :
+            # no suitable address found :(
+            raise SocketConnectEndpointError(endpoint, errors)
+
+    def _init_connect_endpoint (self, endpoint, family, socktype, protocol = 0):
+        """
+            If we already have an existing socket, connect it to the given endpoint, otherwise try and connect to the
+            given endpoint with a new socket.
+
+            There is a subtle difference here, because if we have e.g. an IPv4 socket and try and connect it to an
+            endpoint with both IPv6 and IPv4 addresses, we will try to connect to an IPv6 address using the IPv4 socket,
+            and then to the IPv4 address using the IPv6 socket.
+            
+            If we do not yet have a socket, then we will attempt to connect to the IPv6 address using an IPv6 socket,
+            and to the IPv4 address using an IPv4 socket.
+        """
+
+        if self.socket :
+            # connect with existing socket
+            self._connect_sock_endpoint(self.socket, endpoint, family, socktype, protocol)
+
+        else :
+            # connect with new socket
+            self._connect_endpoint(endpoint, family, socktype, protocol)
+
+class Stream (Base) :
+    """
+        Unbuffered byte-stream interface.
+    """
+   
+    def read (self, iov) :
+        return self.sock.read(iov)
+
+    def readv (self, iovecs) :
+        return self.sock.readv(iovecs)
+
+    def write (self, buf) :
+        return self.sock.write(buf)
+
+    def writev (self, iovecs) :
+        return self.sock.writev(iovecs)
+
+    def close (self) :
+        self.sock.close()
+
+    def abort (self) :
+        # XXX: SO_LINGER magic
+
+        raise NotImplementedError()
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/net/transport/stream.py	Fri Aug 21 00:30:06 2009 +0300
@@ -0,0 +1,103 @@
+"""
+    Abstract byte-stream transport interface.
+"""
+
+from qmsk.net.transport import transport
+
+class Stream (transport.Transport) :
+    """
+        A byte stream oriented transport.
+
+        This provides a sequenced, reliable, full-duplex, connection-oriented byte stream.
+    """
+
+    def read (self, iov) :
+        """
+            Read and return up to `iov` bytes from this stream. This may return fewer bytes than requested.
+
+            If the stream does not contain any more data (i.e. EOF), an empty string is returned.
+        """
+
+        raise NotImplementedError()
+
+    def readv (self, iovecs) :
+        """
+            Attempt to read data into a sequence of iov's, returning a sequence of buffers with the read data.
+
+            This may return fewer buffers than specified if there was not sufficient data to fill all buffers.
+
+            By default, this is simply implemented using read(), but an implementation may also provide a more
+            efficient version.
+        """
+        
+        ret = []
+
+        for iovec in iovecs :
+            buf = self.read(iovec)
+
+            if buf :
+                ret.append(buf)
+
+            else :
+                # EOF! This should not be returned by readv!
+                break
+        
+        return ret
+
+    def write (self, buf) :
+        """
+            Attempt to write the given data to this stream, returning the number of bytes written, which may be less
+            than the length of the buffer given.
+        """
+
+        raise NotImplementedError()
+
+    def writev (self, iovecs) :
+        """
+            Attempt to write data from the given sequence of buffers to this stream, returning the total number of
+            bytes written, which may be less than the total length of the buffers given.
+
+            By default, this is simply implemented using write(), but an implementation may also provide a more
+            efficient version.
+        """
+
+        ret = 0
+
+        for buf in iovec :
+            # send this buffer
+            buf_len = self.write(buf)
+            
+            # count total bytes sent
+            ret += buf_len
+
+            if buf_len < len(buf) :
+                # buffer was sent incompletely, do not try and send any more
+                break
+        
+        return ret
+
+    def close (eslf) :
+        """
+            Close this stream, disallowing any more read/write operations.
+
+            This discards any unread data, but written data that has been buffered will still be sent.
+        """
+
+        raise NotImplementedError()
+
+    def abort (self) :
+        """
+            Close this stream immediately, invalidating it for future use.
+
+            This discards any unread data, and may discard written data that has been buffered and not yet sent. This
+            should also ignore any close-related errors.
+
+            By default, this is implemented using close(), but an implementation may also provide a more efficient
+            version.
+
+            XXX: remove default implementation?
+        """
+        
+        # XXX: ignore close-related errors?
+        self.close()
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/net/transport/tcp.py	Fri Aug 21 00:30:06 2009 +0300
@@ -0,0 +1,118 @@
+"""
+    TCP service/client implementation.
+"""
+
+from qmsk.net.transport import service, client, stream, socket
+
+# default backlog for listen()
+# XXX: number pulled out of a hat
+LISTEN_BACKLOG = 5
+
+class Connection (socket.Stream, stream.Stream) :
+    """
+        Stream interface for a TCP connection
+    """
+
+    def __init__ (self, sock) :
+        """
+            Initialize with the given already-existing, connected, socket.
+        """
+
+        self._init_sock(sock)
+    
+    def shutdown (self, how) :
+        """
+            Selectively shut-down parts of all of the full-duplex TCP connection.
+
+                how             - one of socket.SHUT_* to shutdown read, write or both.
+        """
+
+        self.sock.shutdown(how)
+
+class Service (socket.Service, service.Service) :
+    """
+        An implementation of Service for TCP sockets.
+    """
+
+    def __init__ (self, endpoint, af=socket.AF_UNSPEC, listen_backlog=LISTEN_BACKLOG) :
+        """
+            Construct a service, bound to the given local endpoint and listening for incoming connections using the
+            given backlog.
+
+                endpoint        - local Endpoint to bind() to. Usually, it is enough to just specify the port.
+                af              - the socket address family to use (one of qmsk.net.socket.constants.AF_*)
+                listen_backlog  - backlog length argument to use for socket.listen()
+            
+
+            This will raise an error if the bind() or listen() operations fail.
+        """
+        
+        # construct a suitable socket bound to the given endpoint
+        self._init_endpoint(endpoint, af, socket.SOCK_STREAM)
+
+        # make us listen
+        self._listen(listen_backlog)
+
+        # ok, great
+
+    def accept (self, cls=Connection) :
+        """
+            Perform an accept() operation on our socket, returning a tcp.Connection.
+        """
+        
+        # XXX: trap and raise a ServiceAcceptError?
+        # XXX: what to do with addr?
+        sock, addr = self.sock.accept()
+        
+        # construct the new Stream
+        return cls(sock)
+
+    def close (self) :
+        """
+            Close the underlying socket object, invalidating this Service for future use.
+
+            This will raise if the underlying socket.close() operation does.
+        """
+
+        self.sock.close()
+
+
+class Client (socket.Client, client.Client) :
+    """
+        An implementation of Client for TCP sockets.
+    """
+
+    def __init__ (self, connect_endpoint, af=socket.AF_UNSPEC, bind_endpoint=None) :
+        """
+            Construct a client, connecting to the given remote endpoint.
+
+                connect_endpoint    - remote Endpoint to connect() to.
+                af                  - socket address family to use (one of qmsk.net.socket.constants.AF_*)
+                bind_endpoint       - (optional) local Endpoint to bind() to before connecting.
+
+        """
+
+        # store
+        self.connect_endpoint = connect_endpoint
+        self.af = af
+        self.bind_endpoint = bind_endpoint
+
+    def connect (self, cls=Connection) :
+        """
+            Perform the connect() operation, returning a tcp.Connection.
+        """
+
+        if self.bind_endpoint :
+            # construct a suitable local socket, bound to a specific endpoint
+            sock = self._bind_endpoint(self.bind_endpoint, self.af, socket.SOCK_STREAM)
+
+            # connect it to the remote endpoint
+            self._connect_sock_endpoint(sock, self.connect_endpoint, self.af, socket.SOCK_STREAM)
+
+        else :
+            # let _init_connect_endpoint pick a socket to use
+            sock = self._connect_endpoint(self.connect_endpoint, self.af, socket.SOCK_STREAM)
+        
+        # construct
+        return cls(sock)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/net/transport/transport.py	Fri Aug 21 00:30:06 2009 +0300
@@ -0,0 +1,11 @@
+"""
+    Abstract transport interface
+"""
+
+class Transport (object) :
+    """
+        A very abstract interface, which doesn't specify much more than being able to send data in some form somewhere.
+
+        See Stream/??? for more concrete interfaces.
+    """
+