"""
TCP service/client implementation.
"""
from qmsk.net.transport import service, client, stream, socket
# default backlog for listen()
# XXX: number pulled out of a hat
LISTEN_BACKLOG = 5
class Connection (socket.Stream, stream.Stream) :
"""
Stream interface for a TCP connection
"""
def __init__ (self, sock) :
"""
Initialize with the given already-existing, connected, socket.
"""
self._init_sock(sock)
def shutdown (self, how) :
"""
Selectively shut-down parts of all of the full-duplex TCP connection.
how - one of socket.SHUT_* to shutdown read, write or both.
"""
self.sock.shutdown(how)
class Service (socket.Service, service.Service) :
"""
An implementation of Service for TCP sockets.
"""
_SOCKTYPE = socket.SOCK_STREAM
def __init__ (self, endpoint, listen_backlog=LISTEN_BACKLOG, family=None) :
"""
Construct a service, bound to the given local endpoint and listening for incoming connections using the
given backlog.
endpoint - local Endpoint to bind() to. Usually, it is enough to just specify the port.
listen_backlog - backlog length argument to use for socket.listen()
family - (optional) address family to use if no endpoint is given
Note that as a special case, it is possible to construct a service without an Endpoint (i.e. None).
In this case, there will be no socket.bind() call, instead, a socket is created with the given address
family (which *MUST* be given), and .listen() causes the OS to pick a local address to use.
This will raise an error if the bind() or listen() operations fail.
"""
# construct a suitable socket bound to the given endpoint
self._init_endpoint(endpoint, family=family)
# make us listen
self._listen(listen_backlog)
# ok, great
def accept (self, cls=Connection) :
"""
Perform an accept() operation on our socket, returning a tcp.Connection.
"""
# XXX: trap and raise a ServiceAcceptError?
# XXX: what to do with addr?
sock, addr = self.sock.accept()
# construct the new Stream
return cls(sock)
def close (self) :
"""
Close the underlying socket object, invalidating this Service for future use.
This will raise if the underlying socket.close() operation does.
"""
self.sock.close()
class Connector (socket.Connect) :
"""
The connect() state machine.
"""
def __init__ (self, bind_endpoint=None, family=None) :
# bind()?
if bind_endpoint or family :
# construct a socket as defined
self._init_endpoint(bind_endpoint, family=family)
def start (self, endpoint) :
"""
Start connecting to the given endpoint
"""
# connect()-time errors
self._errors = []
# resolve the list of addresses to try and connect to
self._ais = self._resolve_endpoint(endpoint)
def cleanup (self) :
"""
Destroy any used state
"""
del self._errors
del self._ais
def build_error (self) :
"""
Build and return an error object for this connect operation, and cleanup
"""
error = socket.SocketConnectEndpointError(self._connect_endpoint, self._connect_errors)
self.cleanup()
return error
def operate (self, nonblocking=True) :
"""
Try and connect to each of our AddrInfo's in turn, collecting any errors, until we either run out of
addresses to try, or we manage to connect().
nonblocking - perform non-blocking connect()'s, so put the socket into non-blocking mode and treat
EINPROGRESS as a succesful connect().
Returns True if we managed to connect, otherwise False.
"""
# get next addrinfo to try
for ai in self._connect_ais :
try :
if self.sock :
# try and connect the existing socket
self._connect_sock_addrinfo(self.sock, ai, nonblocking)
else :
# create a new socket and connect it
self.sock = self._connect_addrinfo(ai, nonblocking)
except SocketConnectAddrinfoError, error :
# log it
self._errors.append(error)
# try the next one
continue
else :
# yay!
return True
else :
# unable to connect anywhere, nothing left to try
return False
def next (self) :
"""
Operate asynchronously,
"""
if not self.operate() :
# faail
pass
def connect (self, endpoint) :
"""
Operate synchronously, either raising an error, or returning a socket.
"""
# init
self.start(endpoint)
if not self.operate() :
# nay :(
raise self.build_error()
else :
# yay :)
return self.sock
class Client (client.Client) :
"""
An implementation of Client for TCP sockets.
"""
_SOCKTYPE = socket.SOCK_STREAM
def __init__ (self, connect_endpoint, bind_endpoint=None, family=None) :
"""
Construct a client, connecting to the given remote endpoint.
connect_endpoint - remote Endpoint to connect() to.
bind_endpoint - (optional) local Endpoint to bind() to before connecting.
family - (optional) family to create sockaddr for
"""
# store
self._connect_endpoint = connect_endpoint
def _connect_next (self) :
"""
Driver for the async connect process
"""
if self._connect(async=True) :
# yay :)
self._connect_deinit()
self.on_connect()
else :
# nay :(
self.on_error(self._connect_error())
def connect (self, cls=Connection) :
"""
Perform a synchronous connect() operation.
"""
connector = Connector(self.bind_endpoint, self.family)
sock = connector.connect(self.connect_endpoint)
return cls(sock)
def connect_async (self, reactor=None) :
"""
Perform an asynchronous connect() operation, returning a Connector object.
"""
connector = Connector(self.bind_endpoint, self.family)
connector.start(self.connect_endpoint)
return connector
def on_connect (self) :
"""
Succesfully connected.
"""
pass
def on_error (self, error) :
"""
Connection failed.
"""
pass