# HG changeset patch # User Tero Marttila # Date 1250803806 -10800 # Node ID 020c89baaa3322aad374281d59baf51b261e546e # Parent 12468e38227eecd6f7390a4520f7b4ba3bf8982d [transport] initial TCP implementation diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/__init__.py --- a/qmsk/net/transport/__init__.py Fri Aug 21 00:29:25 2009 +0300 +++ b/qmsk/net/transport/__init__.py Fri Aug 21 00:30:06 2009 +0300 @@ -1,7 +1,9 @@ """ Transport-layer functionality. - This implements TCP/UDP/SCTP functionality, plus relevant Transport-Application layer stuff like TLS, SOCKS, SSH-channels, etc. + This implements the core TCP/UDP/SCTP functionality, plus relevant transport/application layer stuff like TLS, SOCKS, SSH-channels, etc. + + XXX: non-blocking interface? Firstly, this defines a set of abstract interfaces that model the actual implementation as built on top of the socket packages's network-layer implementation: @@ -14,6 +16,8 @@ ConnectedTransport - connection-oriented transport Stream - connection-oriented full-duplex byte stream (reliable, sequenced) + + BufferedStream - buffered version of the above PacketStream - connection-oriented full-duplex packet-oriented communication @@ -95,5 +99,10 @@ Stream - tcp.Stream carried over a SOCKSv5 connections Client - tcp.Client connecting via a SOCKSv5 server - + + + + + TODO: + * implement the above """ diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/client.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/net/transport/client.py Fri Aug 21 00:30:06 2009 +0300 @@ -0,0 +1,32 @@ +""" + Abstract Client interface +""" + +class Client (object) : + """ + A Client establishes a connection to some remote Service. + + This is an abstract base class that is further impemented by e.g. TCP or SSL. + + Note that a Client behaves more like a factory, such that the Client itself is not a Transport, one must instead + call .connect(), rather like one must call Service.accept(). + """ + + def __init__ (self, endpoint) : + """ + Connect to the given remote endpoint. + + endpoint - the remote Endpoint to connect to + """ + + raise NotImplementedError() + + def connect (self, cls=None) : + """ + Perform the connect operation, returning a new Transport. + + cls - optional transport-specific type to use for the new connection. + """ + + raise NotImplementedError() + diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/service.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/net/transport/service.py Fri Aug 21 00:30:06 2009 +0300 @@ -0,0 +1,47 @@ +""" + Abstract Service interface +""" + +class ServiceError (Exception) : + """ + Service-related error. + """ + +class Service (object) : + """ + A Service listens for incoming connections, handing those connections off as separate objects. + + This is an abstract base class that is further impemented by e.g. TCP or SSL. + """ + + def __init__ (self, endpoint) : + """ + Listen on the given local endpoint. + + endpoint - the local Endpoint to provide this service on. + """ + + raise NotImplementedError() + + def accept (self, cls=None) : + """ + Accept and return a new Transport object representing a connected remote Client. + + cls - optional transport-specific type to use for the new connection. + """ + + raise NotImplementedError() + + def close (self) : + """ + Explicitly shut down this Service, closing any underlying resources and refusing any new connections. + + This will happen implicitly if the Service is destroyed, this is just the explicit interface. + + This invalidates this Service object for future use. + + This may, in theory, raise an error, which would be supressed if this Service were destroyed. + """ + + raise NotImplementedError() + diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/socket.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/net/transport/socket.py Fri Aug 21 00:30:06 2009 +0300 @@ -0,0 +1,359 @@ +""" + Socket implementation helpers +""" + +from qmsk.net.socket import socket +from qmsk.net.socket.constants import * + +class SocketError (Exception) : + """ + Base class of errors raised by the Socket classes. + """ + +class SocketBindAddrinfoError (SocketError) : + """ + The socket was unable to socket()+bind() to the given addrinfo. + """ + + def __init__ (self, addrinfo, error) : + """ + addrinfo - the addrinfo we tried to use + error - the resulting error + """ + + self.addrinfo = addrinfo + self.error = error + + def __str__ (self) : + return "Unable to bind() to %s: %s" % (self.addrinfo, self.error) + +class SocketBindEndpointError (SocketError) : + """ + The socket was unable to bind() to the any of the given endpoint's addresses. + """ + + def __init__ (self, endpoint, errors) : + """ + endpoint - the endpoint we tried to bind to + errors - a sequence of ServiceBindAddrinfoErrors describing the failed bind attempts. + """ + + self.endpoint = endpoint + self.errors = errors + + def __str__ (self) : + # XXX: too verbose? + return "Unable to bind() to any addresses on endpoint %s:\n%s" % (self.endpoint, "\n".join( + "\t%s" % (error, ) for error in self.errors + )) + + +class Base (object) : + """ + Base class for all other socket-related classes, contains the underlying socket object. + """ + + # the underlying socket object + sock = None + + def _init_sock (self, sock) : + """ + Initialize with the given pre-existing socket. + """ + + self.sock = sock + + +class Common (Base) : + """ + Common operations for Client/Service + """ + + @classmethod + def _socket (self, family, socktype, protocol = 0) : + """ + Construct and return a new socket object using the given parameters. + """ + + return socket.socket(family, socktype, protocol) + + @classmethod + def _bind_addrinfo (cls, ai) : + """ + This will attempt to create a new socket and bind it, based on the given addrinfo, returning the new socket. + + Raises a ServiceBindAddrinfoError if this fails + """ + + try : + # socket() + sock = self._socket(ai.family, ai.socktype, ai.protocol) + + # bind() + sock.bind(ai.addr) + + # XXX: except socket.error as e : + except OSError, error : + raise SocketBindAddrinfoError(ai, error) + + else : + return sock + + @classmethod + def _bind_endpoint (cls, endpoint, family, socktype, protocol=0) : + """ + This will resolve the given endpoint, and attempt to create and bind a suitable socket and return it. + + endpoint - local Endpoint to bind() to. + family - socket address family to use. + socktype - socket type to use + protocol - (optional) specific protocol + + Raises a ServiceBindError if this is unable to create a bound socket. + + XXX: bind to all of the given endpoint's addresses instead of just one...? + """ + + errors = [] + + # resolve the endpoint and try socket+bind + for ai in endpoint.getaddrinfo(family, socktype, protocol, AI_PASSIVE) : + try : + # try to socket+bind this addrinfo + sock = cls._bind_addrinfo(ai) + + except SocketBindAddrinfoError, error : + # collect + errors.append(error) + + # keep trying + continue + + else : + # got a working socket :) + return sock + + else : + # no suitable address found :( + raise SocketBindEndpointError(endpoint, errors) + + def _init_endpoint (self, endpoint, family, socktype, protocol = 0) : + """ + Initialize this socket by constructing a new socket with the given parameters, bound to the given endpoint, + if given. If no endpoint is given, this simply creates a socket with the given settings and does not bind + it anywhere. + """ + + # create local socket + if endpoint : + # create a suitable socket bound to a the given endpoint + self.sock = self._bind_endpoint(endpoint, family, socktype, protocol) + + else : + # create a suitable socket not bound to anything + self.sock = self._socket(family, socktype, protocol) + + +class Service (Common) : + """ + Listener socket + """ + + def _listen (self, backlog) : + """ + Puts this socket into listen() mode with the given backlog. + """ + + self.sock.listen(backlog) + +class SocketConnectAddrinfoError (SocketError) : + """ + The socket was unable to socket()+connect() to the given addrinfo. + """ + + def __init__ (self, addrinfo, error) : + """ + addrinfo - the addrinfo we tried to use + error - the resulting error + """ + + self.addrinfo = addrinfo + self.error = error + + def __str__ (self) : + return "Unable to connect() to %s: %s" % (self.addrinfo, self.error) + +class SocketConnectEndpointError (SocketError) : + """ + The socket was unable to connect() to the any of the given endpoint's addresses. + """ + + def __init__ (self, endpoint, errors) : + """ + endpoint - the endpoint we tried to connect to + errors - a sequence of ServiceBindAddrinfoErrors describing the failed connect attempts. + """ + + self.endpoint = endpoint + self.errors = errors + + def __str__ (self) : + # XXX: too verbose? + return "Unable to connect() to any addresses on endpoint %s:\n%s" % (self.endpoint, "\n".join( + "\t%s" % (error, ) for error in self.errors + )) + + +class Client (Common) : + """ + Connecting socket + """ + + @classmethod + def _connect_sock_addr (cls, sock, addr) : + """ + Attempt to connect the given socket to the given address. + """ + + sock.connect(addr) + + @classmethod + def _connect_sock_addrinfo (cls, sock, ai) : + """ + Attempt to connect the given socket to the given addrinfo's address. + """ + + try : + cls._connect_sock_addr(sock, ai.addr) + + # XXX: except socket.error as e : + except OSError, error : + raise SocketConnectAddrinfoError(ai, error) + + @classmethod + def _connect_addrinfo (cls, ai) : + """ + Attempt to create a socket and connect it based on the given addrinfo, returning the new socket is succesfull. + """ + + try : + # socket() + sock = cls._socket(ai.family, ai.socktype, ai.protocol) + + # XXX: except socket.error as e : + except OSError, error : + raise SocketConnectAddrinfoError(ai, error) + + + # try and connect() it + cls._connect_sock_addrinfo(sock, ai) + + # return once succesfully + return sock + + @classmethod + def _connect_sock_endpoint (cls, sock, endpoint, family, socktype, protocol = 0) : + """ + Connect this socket to the given remote endpoint, using the given parameters to resolve the endpoint. + """ + + errors = [] + + # resolve the endpoint and try socket+bind + for ai in endpoint.getaddrinfo(family, socktype, protocol) : + try : + # try to connect the socket to this addrinfo + cls._connect_sock_addrinfo(sock, ai) + + except SocketConnectAddrinfoError, error : + # collect + errors.append(error) + + # keep trying + continue + + else : + # got a working socket :) + return + + else : + # no suitable address found :( + raise SocketConnectEndpointError(endpoint, errors) + + @classmethod + def _connect_endpoint (self, endpoint, family, socktype, protocol = 0) : + """ + Create a new socket and connect it to the given remote endpoint, using the given parameters to resolve the + endpoint. + """ + + errors = [] + + # resolve the endpoint and try socket+bind + for ai in endpoint.getaddrinfo(family, socktype, protocol) : + try : + # try to socket+connect this addrinfo + sock = cls._connect_addrinfo(ai) + + except SocketConnectAddrinfoError, error : + # collect + errors.append(error) + + # keep trying + continue + + else : + # got a working socket :) + return sock + + else : + # no suitable address found :( + raise SocketConnectEndpointError(endpoint, errors) + + def _init_connect_endpoint (self, endpoint, family, socktype, protocol = 0): + """ + If we already have an existing socket, connect it to the given endpoint, otherwise try and connect to the + given endpoint with a new socket. + + There is a subtle difference here, because if we have e.g. an IPv4 socket and try and connect it to an + endpoint with both IPv6 and IPv4 addresses, we will try to connect to an IPv6 address using the IPv4 socket, + and then to the IPv4 address using the IPv6 socket. + + If we do not yet have a socket, then we will attempt to connect to the IPv6 address using an IPv6 socket, + and to the IPv4 address using an IPv4 socket. + """ + + if self.socket : + # connect with existing socket + self._connect_sock_endpoint(self.socket, endpoint, family, socktype, protocol) + + else : + # connect with new socket + self._connect_endpoint(endpoint, family, socktype, protocol) + +class Stream (Base) : + """ + Unbuffered byte-stream interface. + """ + + def read (self, iov) : + return self.sock.read(iov) + + def readv (self, iovecs) : + return self.sock.readv(iovecs) + + def write (self, buf) : + return self.sock.write(buf) + + def writev (self, iovecs) : + return self.sock.writev(iovecs) + + def close (self) : + self.sock.close() + + def abort (self) : + # XXX: SO_LINGER magic + + raise NotImplementedError() + + diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/stream.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/net/transport/stream.py Fri Aug 21 00:30:06 2009 +0300 @@ -0,0 +1,103 @@ +""" + Abstract byte-stream transport interface. +""" + +from qmsk.net.transport import transport + +class Stream (transport.Transport) : + """ + A byte stream oriented transport. + + This provides a sequenced, reliable, full-duplex, connection-oriented byte stream. + """ + + def read (self, iov) : + """ + Read and return up to `iov` bytes from this stream. This may return fewer bytes than requested. + + If the stream does not contain any more data (i.e. EOF), an empty string is returned. + """ + + raise NotImplementedError() + + def readv (self, iovecs) : + """ + Attempt to read data into a sequence of iov's, returning a sequence of buffers with the read data. + + This may return fewer buffers than specified if there was not sufficient data to fill all buffers. + + By default, this is simply implemented using read(), but an implementation may also provide a more + efficient version. + """ + + ret = [] + + for iovec in iovecs : + buf = self.read(iovec) + + if buf : + ret.append(buf) + + else : + # EOF! This should not be returned by readv! + break + + return ret + + def write (self, buf) : + """ + Attempt to write the given data to this stream, returning the number of bytes written, which may be less + than the length of the buffer given. + """ + + raise NotImplementedError() + + def writev (self, iovecs) : + """ + Attempt to write data from the given sequence of buffers to this stream, returning the total number of + bytes written, which may be less than the total length of the buffers given. + + By default, this is simply implemented using write(), but an implementation may also provide a more + efficient version. + """ + + ret = 0 + + for buf in iovec : + # send this buffer + buf_len = self.write(buf) + + # count total bytes sent + ret += buf_len + + if buf_len < len(buf) : + # buffer was sent incompletely, do not try and send any more + break + + return ret + + def close (eslf) : + """ + Close this stream, disallowing any more read/write operations. + + This discards any unread data, but written data that has been buffered will still be sent. + """ + + raise NotImplementedError() + + def abort (self) : + """ + Close this stream immediately, invalidating it for future use. + + This discards any unread data, and may discard written data that has been buffered and not yet sent. This + should also ignore any close-related errors. + + By default, this is implemented using close(), but an implementation may also provide a more efficient + version. + + XXX: remove default implementation? + """ + + # XXX: ignore close-related errors? + self.close() + diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/tcp.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/net/transport/tcp.py Fri Aug 21 00:30:06 2009 +0300 @@ -0,0 +1,118 @@ +""" + TCP service/client implementation. +""" + +from qmsk.net.transport import service, client, stream, socket + +# default backlog for listen() +# XXX: number pulled out of a hat +LISTEN_BACKLOG = 5 + +class Connection (socket.Stream, stream.Stream) : + """ + Stream interface for a TCP connection + """ + + def __init__ (self, sock) : + """ + Initialize with the given already-existing, connected, socket. + """ + + self._init_sock(sock) + + def shutdown (self, how) : + """ + Selectively shut-down parts of all of the full-duplex TCP connection. + + how - one of socket.SHUT_* to shutdown read, write or both. + """ + + self.sock.shutdown(how) + +class Service (socket.Service, service.Service) : + """ + An implementation of Service for TCP sockets. + """ + + def __init__ (self, endpoint, af=socket.AF_UNSPEC, listen_backlog=LISTEN_BACKLOG) : + """ + Construct a service, bound to the given local endpoint and listening for incoming connections using the + given backlog. + + endpoint - local Endpoint to bind() to. Usually, it is enough to just specify the port. + af - the socket address family to use (one of qmsk.net.socket.constants.AF_*) + listen_backlog - backlog length argument to use for socket.listen() + + + This will raise an error if the bind() or listen() operations fail. + """ + + # construct a suitable socket bound to the given endpoint + self._init_endpoint(endpoint, af, socket.SOCK_STREAM) + + # make us listen + self._listen(listen_backlog) + + # ok, great + + def accept (self, cls=Connection) : + """ + Perform an accept() operation on our socket, returning a tcp.Connection. + """ + + # XXX: trap and raise a ServiceAcceptError? + # XXX: what to do with addr? + sock, addr = self.sock.accept() + + # construct the new Stream + return cls(sock) + + def close (self) : + """ + Close the underlying socket object, invalidating this Service for future use. + + This will raise if the underlying socket.close() operation does. + """ + + self.sock.close() + + +class Client (socket.Client, client.Client) : + """ + An implementation of Client for TCP sockets. + """ + + def __init__ (self, connect_endpoint, af=socket.AF_UNSPEC, bind_endpoint=None) : + """ + Construct a client, connecting to the given remote endpoint. + + connect_endpoint - remote Endpoint to connect() to. + af - socket address family to use (one of qmsk.net.socket.constants.AF_*) + bind_endpoint - (optional) local Endpoint to bind() to before connecting. + + """ + + # store + self.connect_endpoint = connect_endpoint + self.af = af + self.bind_endpoint = bind_endpoint + + def connect (self, cls=Connection) : + """ + Perform the connect() operation, returning a tcp.Connection. + """ + + if self.bind_endpoint : + # construct a suitable local socket, bound to a specific endpoint + sock = self._bind_endpoint(self.bind_endpoint, self.af, socket.SOCK_STREAM) + + # connect it to the remote endpoint + self._connect_sock_endpoint(sock, self.connect_endpoint, self.af, socket.SOCK_STREAM) + + else : + # let _init_connect_endpoint pick a socket to use + sock = self._connect_endpoint(self.connect_endpoint, self.af, socket.SOCK_STREAM) + + # construct + return cls(sock) + diff -r 12468e38227e -r 020c89baaa33 qmsk/net/transport/transport.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/qmsk/net/transport/transport.py Fri Aug 21 00:30:06 2009 +0300 @@ -0,0 +1,11 @@ +""" + Abstract transport interface +""" + +class Transport (object) : + """ + A very abstract interface, which doesn't specify much more than being able to send data in some form somewhere. + + See Stream/??? for more concrete interfaces. + """ +