# HG changeset patch # User Tero Marttila # Date 1251746356 -10800 # Node ID bb49bf8222ed81a38b5613a073293338e5a2e811 # Parent 01ac7755b15a1b93c2e84e7fe415f250a758b785 initial async connect attempt diff -r 01ac7755b15a -r bb49bf8222ed qmsk/net/transport/client.py --- a/qmsk/net/transport/client.py Mon Aug 31 22:17:54 2009 +0300 +++ b/qmsk/net/transport/client.py Mon Aug 31 22:19:16 2009 +0300 @@ -30,3 +30,6 @@ raise NotImplementedError() + def connect_async (self, cls=None, reactor=None) : + raise NotImplementedError() + diff -r 01ac7755b15a -r bb49bf8222ed qmsk/net/transport/socket.py --- a/qmsk/net/transport/socket.py Mon Aug 31 22:17:54 2009 +0300 +++ b/qmsk/net/transport/socket.py Mon Aug 31 22:19:16 2009 +0300 @@ -5,6 +5,8 @@ from qmsk.net.socket import socket from qmsk.net.socket.constants import * +import errno + class SocketError (Exception) : """ Base class of errors raised by the Socket classes. @@ -62,7 +64,21 @@ """ self.sock = sock + + def _set_nonblocking (self, nonblocking) : + """ + Set/unset non-blocking I/O mode. + """ + + # XXX: implement elsewhere + import fcntl, os + flags = 0 + + if nonblocking : + flags |= os.O_NONBLOCK + + fcntl.fcntl(self.sock, fcntl.F_SETFL, flags) class Common (Base) : """ @@ -104,6 +120,19 @@ else : return sock + + @classmethod + def _resolve_endpoint (cls, endpoint, socktype = None, protocol = 0, flags = 0) : + """ + Yield an *iterator* of AddrInfo objects by resolving the given endpoint endpoint using parameters suitable + for this socket. + """ + + if socktype is None : + socktype = cls._SOCKTYPE + + for ai in endpoint.resolve(socktype, protocol, flags) : + yield ai @classmethod def _bind_endpoint (cls, endpoint, socktype = None, protocol=0) : @@ -121,11 +150,8 @@ errors = [] - if socktype is None : - socktype = cls._SOCKTYPE - # resolve the endpoint and try socket+bind - for ai in endpoint.resolve(socktype, protocol, AI_PASSIVE) : + for ai in cls._resolve_endpoint(endpoint, socktype, protocol, AI_PASSIVE) : try : # try to socket+bind this addrinfo sock = cls._bind_addrinfo(ai) @@ -153,6 +179,8 @@ """ if endpoint is not None : + assert not family + # create a suitable socket bound to a the given endpoint self.sock = self._bind_endpoint(endpoint, socktype, protocol) @@ -212,7 +240,7 @@ )) -class Client (Common) : +class Connect (Common) : """ Connecting socket """ @@ -222,13 +250,15 @@ """ Attempt to connect the given socket to the given address. """ - + sock.connect(addr) @classmethod - def _connect_sock_addrinfo (cls, sock, ai) : + def _connect_sock_addrinfo (cls, sock, ai, nonblocking=False) : """ Attempt to connect the given socket to the given addrinfo's address. + + nonblocking - treat EINPROGRESS as success """ try : @@ -236,10 +266,14 @@ # XXX: except socket.error as e : except OSError, error : - raise SocketConnectAddrinfoError(ai, error) + if nonblocking and error.errno == errno.EINPROGRESS : + return + + else : + raise SocketConnectAddrinfoError(ai, error) @classmethod - def _connect_addrinfo (cls, ai) : + def _connect_addrinfo (cls, ai, nonblocking=False) : """ Attempt to create a socket and connect it based on the given addrinfo, returning the new socket is succesfull. """ @@ -252,9 +286,8 @@ except OSError, error : raise SocketConnectAddrinfoError(ai, error) - # try and connect() it - cls._connect_sock_addrinfo(sock, ai) + cls._connect_sock_addrinfo(sock, ai, nonblocking) # return once succesfully return sock @@ -267,11 +300,8 @@ errors = [] - if socktype is None : - socktype = cls._SOCKTYPE - # resolve the endpoint and try socket+bind - for ai in endpoint.resolve(socktype, protocol) : + for ai in cls._resolve_endpoint(endpoint, socktype, protocol) : try : # try to connect the socket to this addrinfo cls._connect_sock_addrinfo(sock, ai) @@ -300,11 +330,8 @@ errors = [] - if socktype is None : - socktype = cls._SOCKTYPE - # resolve the endpoint and try socket+bind - for ai in endpoint.resolve(socktype, protocol) : + for ai in cls._resolve_endpoint(endpoint, socktype, protocol) : try : # try to socket+connect this addrinfo sock = cls._connect_addrinfo(ai) @@ -337,13 +364,13 @@ and to the IPv4 address using an IPv4 socket. """ - if self.socket : + if self.sock : # connect with existing socket - self._connect_sock_endpoint(self.socket, endpoint, socktype, protocol) + self._connect_sock_endpoint(self.sock, endpoint, socktype, protocol) else : # connect with new socket - self._connect_endpoint(endpoint, socktype, protocol) + self.sock = self._connect_endpoint(endpoint, socktype, protocol) class Stream (Base) : """ diff -r 01ac7755b15a -r bb49bf8222ed qmsk/net/transport/tcp.py --- a/qmsk/net/transport/tcp.py Mon Aug 31 22:17:54 2009 +0300 +++ b/qmsk/net/transport/tcp.py Mon Aug 31 22:19:16 2009 +0300 @@ -81,43 +81,182 @@ self.sock.close() +class Connector (socket.Connect) : + """ + The connect() state machine. + """ -class Client (socket.Client, client.Client) : + def __init__ (self, bind_endpoint=None, family=None) : + # bind()? + if bind_endpoint or family : + # construct a socket as defined + self._init_endpoint(bind_endpoint, family=family) + + def start (self, endpoint) : + """ + Start connecting to the given endpoint + """ + + # connect()-time errors + self._errors = [] + + # resolve the list of addresses to try and connect to + self._ais = self._resolve_endpoint(endpoint) + + def cleanup (self) : + """ + Destroy any used state + """ + + del self._errors + del self._ais + + def build_error (self) : + """ + Build and return an error object for this connect operation, and cleanup + """ + + error = socket.SocketConnectEndpointError(self._connect_endpoint, self._connect_errors) + + self.cleanup() + + return error + + def operate (self, nonblocking=True) : + """ + Try and connect to each of our AddrInfo's in turn, collecting any errors, until we either run out of + addresses to try, or we manage to connect(). + + nonblocking - perform non-blocking connect()'s, so put the socket into non-blocking mode and treat + EINPROGRESS as a succesful connect(). + + Returns True if we managed to connect, otherwise False. + """ + + # get next addrinfo to try + for ai in self._connect_ais : + try : + if self.sock : + # try and connect the existing socket + self._connect_sock_addrinfo(self.sock, ai, nonblocking) + + else : + # create a new socket and connect it + self.sock = self._connect_addrinfo(ai, nonblocking) + + except SocketConnectAddrinfoError, error : + # log it + self._errors.append(error) + + # try the next one + continue + + else : + # yay! + return True + + else : + # unable to connect anywhere, nothing left to try + return False + + def next (self) : + """ + Operate asynchronously, + """ + + if not self.operate() : + # faail + pass + + + def connect (self, endpoint) : + """ + Operate synchronously, either raising an error, or returning a socket. + """ + + # init + self.start(endpoint) + + if not self.operate() : + # nay :( + raise self.build_error() + + else : + # yay :) + return self.sock + +class Client (client.Client) : """ An implementation of Client for TCP sockets. """ _SOCKTYPE = socket.SOCK_STREAM - def __init__ (self, connect_endpoint, bind_endpoint=None) : + def __init__ (self, connect_endpoint, bind_endpoint=None, family=None) : """ Construct a client, connecting to the given remote endpoint. connect_endpoint - remote Endpoint to connect() to. bind_endpoint - (optional) local Endpoint to bind() to before connecting. - + family - (optional) family to create sockaddr for """ # store - self.connect_endpoint = connect_endpoint - self.bind_endpoint = bind_endpoint + self._connect_endpoint = connect_endpoint + + + + + + + def _connect_next (self) : + """ + Driver for the async connect process + """ + + if self._connect(async=True) : + # yay :) + self._connect_deinit() + + self.on_connect() + + else : + # nay :( + self.on_error(self._connect_error()) def connect (self, cls=Connection) : """ - Perform the connect() operation, returning a tcp.Connection. + Perform a synchronous connect() operation. """ - if self.bind_endpoint : - # construct a suitable local socket, bound to a specific endpoint - sock = self._bind_endpoint(self.bind_endpoint) + connector = Connector(self.bind_endpoint, self.family) - # connect it to the remote endpoint - self._connect_sock_endpoint(sock, self.connect_endpoint) + sock = connector.connect(self.connect_endpoint) - else : - # let _init_connect_endpoint pick a socket to use - sock = self._connect_endpoint(self.connect_endpoint) - - # construct return cls(sock) + def connect_async (self, reactor=None) : + """ + Perform an asynchronous connect() operation, returning a Connector object. + """ + + connector = Connector(self.bind_endpoint, self.family) + + connector.start(self.connect_endpoint) + + return connector + + def on_connect (self) : + """ + Succesfully connected. + """ + + pass + + def on_error (self, error) : + """ + Connection failed. + """ + + pass +