qmsk/net/transport/tcp.py
branchconnect-async
changeset 45 bb49bf8222ed
parent 37 14db3fe42b6c
equal deleted inserted replaced
44:01ac7755b15a 45:bb49bf8222ed
    79             This will raise if the underlying socket.close() operation does.
    79             This will raise if the underlying socket.close() operation does.
    80         """
    80         """
    81 
    81 
    82         self.sock.close()
    82         self.sock.close()
    83 
    83 
    84 
    84 class Connector (socket.Connect) :
    85 class Client (socket.Client, client.Client) :
    85     """
       
    86         The connect() state machine.
       
    87     """
       
    88 
       
    89     def __init__ (self, bind_endpoint=None, family=None) :
       
    90         # bind()?
       
    91         if bind_endpoint or family :
       
    92             # construct a socket as defined
       
    93             self._init_endpoint(bind_endpoint, family=family)
       
    94  
       
    95     def start (self, endpoint) :
       
    96         """
       
    97             Start connecting to the given endpoint
       
    98         """
       
    99 
       
   100         # connect()-time errors
       
   101         self._errors = []
       
   102         
       
   103         # resolve the list of addresses to try and connect to
       
   104         self._ais = self._resolve_endpoint(endpoint)
       
   105     
       
   106     def cleanup (self) :
       
   107         """
       
   108             Destroy any used state
       
   109         """
       
   110 
       
   111         del self._errors
       
   112         del self._ais
       
   113 
       
   114     def build_error (self) :
       
   115         """
       
   116             Build and return an error object for this connect operation, and cleanup
       
   117         """
       
   118 
       
   119         error = socket.SocketConnectEndpointError(self._connect_endpoint, self._connect_errors)
       
   120         
       
   121         self.cleanup()
       
   122 
       
   123         return error
       
   124 
       
   125     def operate (self, nonblocking=True) :
       
   126         """
       
   127             Try and connect to each of our AddrInfo's in turn, collecting any errors, until we either run out of
       
   128             addresses to try, or we manage to connect().
       
   129 
       
   130                 nonblocking - perform non-blocking connect()'s, so put the socket into non-blocking mode and treat
       
   131                               EINPROGRESS as a succesful connect().
       
   132 
       
   133             Returns True if we managed to connect, otherwise False.
       
   134         """
       
   135         
       
   136         # get next addrinfo to try
       
   137         for ai in self._connect_ais :
       
   138             try :
       
   139                 if self.sock :
       
   140                     # try and connect the existing socket
       
   141                     self._connect_sock_addrinfo(self.sock, ai, nonblocking)
       
   142                 
       
   143                 else :
       
   144                     # create a new socket and connect it
       
   145                     self.sock = self._connect_addrinfo(ai, nonblocking)
       
   146             
       
   147             except SocketConnectAddrinfoError, error :
       
   148                 # log it
       
   149                 self._errors.append(error)
       
   150 
       
   151                 # try the next one
       
   152                 continue
       
   153             
       
   154             else :
       
   155                 # yay!
       
   156                 return True
       
   157 
       
   158         else :
       
   159             # unable to connect anywhere, nothing left to try
       
   160             return False
       
   161     
       
   162     def next (self) :
       
   163         """
       
   164             Operate asynchronously, 
       
   165         """
       
   166 
       
   167         if not self.operate() :
       
   168             # faail
       
   169             pass
       
   170         
       
   171 
       
   172     def connect (self, endpoint) :
       
   173         """
       
   174             Operate synchronously, either raising an error, or returning a socket.
       
   175         """
       
   176         
       
   177         # init
       
   178         self.start(endpoint)
       
   179 
       
   180         if not self.operate() :
       
   181             # nay :(
       
   182             raise self.build_error()
       
   183 
       
   184         else :
       
   185             # yay :)
       
   186             return self.sock
       
   187         
       
   188 class Client (client.Client) :
    86     """
   189     """
    87         An implementation of Client for TCP sockets.
   190         An implementation of Client for TCP sockets.
    88     """
   191     """
    89 
   192 
    90     _SOCKTYPE = socket.SOCK_STREAM
   193     _SOCKTYPE = socket.SOCK_STREAM
    91 
   194 
    92     def __init__ (self, connect_endpoint, bind_endpoint=None) :
   195     def __init__ (self, connect_endpoint, bind_endpoint=None, family=None) :
    93         """
   196         """
    94             Construct a client, connecting to the given remote endpoint.
   197             Construct a client, connecting to the given remote endpoint.
    95 
   198 
    96                 connect_endpoint    - remote Endpoint to connect() to.
   199                 connect_endpoint    - remote Endpoint to connect() to.
    97                 bind_endpoint       - (optional) local Endpoint to bind() to before connecting.
   200                 bind_endpoint       - (optional) local Endpoint to bind() to before connecting.
    98 
   201                 family              - (optional) family to create sockaddr for
    99         """
   202         """
   100 
   203 
   101         # store
   204         # store
   102         self.connect_endpoint = connect_endpoint
   205         self._connect_endpoint = connect_endpoint
   103         self.bind_endpoint = bind_endpoint
   206         
       
   207 
       
   208    
       
   209    
       
   210   
       
   211    
       
   212     def _connect_next (self) :
       
   213         """
       
   214             Driver for the async connect process
       
   215         """
       
   216 
       
   217         if self._connect(async=True) :
       
   218             # yay :)
       
   219             self._connect_deinit()
       
   220 
       
   221             self.on_connect()
       
   222 
       
   223         else :
       
   224             # nay :(
       
   225             self.on_error(self._connect_error())
   104 
   226 
   105     def connect (self, cls=Connection) :
   227     def connect (self, cls=Connection) :
   106         """
   228         """
   107             Perform the connect() operation, returning a tcp.Connection.
   229             Perform a synchronous connect() operation.
   108         """
   230         """
   109 
   231 
   110         if self.bind_endpoint :
   232         connector = Connector(self.bind_endpoint, self.family)
   111             # construct a suitable local socket, bound to a specific endpoint
   233 
   112             sock = self._bind_endpoint(self.bind_endpoint)
   234         sock = connector.connect(self.connect_endpoint)
   113 
   235 
   114             # connect it to the remote endpoint
       
   115             self._connect_sock_endpoint(sock, self.connect_endpoint)
       
   116 
       
   117         else :
       
   118             # let _init_connect_endpoint pick a socket to use
       
   119             sock = self._connect_endpoint(self.connect_endpoint)
       
   120         
       
   121         # construct
       
   122         return cls(sock)
   236         return cls(sock)
   123 
   237 
       
   238     def connect_async (self, reactor=None) :
       
   239         """
       
   240             Perform an asynchronous connect() operation, returning a Connector object.
       
   241         """
       
   242 
       
   243         connector = Connector(self.bind_endpoint, self.family)
       
   244 
       
   245         connector.start(self.connect_endpoint)
       
   246 
       
   247         return connector
       
   248 
       
   249     def on_connect (self) :
       
   250         """
       
   251             Succesfully connected.
       
   252         """
       
   253 
       
   254         pass
       
   255 
       
   256     def on_error (self, error) :
       
   257         """
       
   258             Connection failed.
       
   259         """
       
   260         
       
   261         pass
       
   262