src/proto2/NetworkTCP.cc
branchno-netsession
changeset 38 4189b8bf3a5b
parent 37 ed2957490bbf
child 39 cfb8b6e2a35f
equal deleted inserted replaced
37:ed2957490bbf 38:4189b8bf3a5b
     1 
     1 
     2 #include "NetworkTCP.hh"
     2 #include "NetworkTCP.hh"
       
     3 #include "Engine.hh"
     3 
     4 
     4 #include <cstdlib>
     5 #include <cstdlib>
     5 #include <cassert>
     6 #include <cassert>
     6 
     7 
     7 NetworkBuffer::NetworkBuffer (NetworkSocket &socket, size_t size_hint) :
     8 NetworkBuffer::NetworkBuffer (NetworkSocket &socket, size_t size_hint) :
    64         ret = socket.recv(buf + offset, to_read);
    65         ret = socket.recv(buf + offset, to_read);
    65 
    66 
    66     } catch (CL_Error &e) {
    67     } catch (CL_Error &e) {
    67         if (errno == EAGAIN)
    68         if (errno == EAGAIN)
    68             return false;
    69             return false;
       
    70 
    69         else
    71         else
    70             throw;
    72             throw NetworkSocketOSError(socket, "recv");
    71     }
    73     }
    72 
    74     
    73     assert(ret > 0);
    75     // handle EOF
       
    76     if (ret == 0)
       
    77         throw NetworkSocketEOFError(socket, "recv");
       
    78 
       
    79     assert(ret >= 0);
    74 
    80 
    75     // update offset
    81     // update offset
    76     offset += ret;
    82     offset += ret;
    77 
    83 
    78     // did we get enough?
    84     // did we get enough?
   134     // copy the data over, unless it's too large
   140     // copy the data over, unless it's too large
   135     if (prefix <= buf_max) {
   141     if (prefix <= buf_max) {
   136         memcpy(buf_ptr, buf, prefix);
   142         memcpy(buf_ptr, buf, prefix);
   137     
   143     
   138         // trim the bytes out
   144         // trim the bytes out
   139         trim(prefix);
   145         trim(sizeof(PrefixType) + prefix);
   140         
   146         
   141         // return
   147         // return
   142         return prefix;
   148         return prefix;
   143 
   149 
   144     } else {
   150     } else {
   145         // trim the bytes out
   151         // trim the bytes out
   146         trim(prefix);
   152         trim(sizeof(PrefixType) + prefix);
   147         
   153         
   148         throw NetworkBufferError("recv prefix overflow");   
   154         throw NetworkBufferError("recv prefix overflow");   
   149     }
   155     }
   150 }
   156 }
   151    
   157    
   159             ret = socket.send(buf_ptr, buf_size);
   165             ret = socket.send(buf_ptr, buf_size);
   160 
   166 
   161         } catch (CL_Error &e) {
   167         } catch (CL_Error &e) {
   162             // ignore EAGAIN, detect this by setting ret to -1
   168             // ignore EAGAIN, detect this by setting ret to -1
   163             if (errno != EAGAIN)
   169             if (errno != EAGAIN)
   164                 throw;
   170                 throw NetworkSocketOSError(socket, "send");
   165 
   171 
   166             ret = -1;
   172             ret = -1;
   167         }
   173         }
   168         
   174         
   169         // if we managed to send something, adjust buf/size and buffer
   175         // if we managed to send something, adjust buf/size and buffer
   202         // ignore EAGAIN and just return
   208         // ignore EAGAIN and just return
   203         if (errno == EAGAIN)
   209         if (errno == EAGAIN)
   204             return;
   210             return;
   205 
   211 
   206         else
   212         else
   207             throw;
   213             throw NetworkSocketOSError(socket, "send");
   208     }
   214     }
   209 
   215 
   210     // trim the buffer
   216     // trim the buffer
   211     trim(ret);
   217     trim(ret);
   212 }
   218 }
   213         
   219         
   214 template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) {
   220 void NetworkBuffer::write_prefix (char *buf, uint16_t prefix) {
   215     push_write((char*) &prefix, sizeof(PrefixType)); 
   221     uint16_t nval = htons(prefix);
       
   222 
       
   223     push_write((char*) &nval, sizeof(uint16_t)); 
   216     push_write(buf, prefix);
   224     push_write(buf, prefix);
   217 }
   225 }
   218 
   226 
       
   227 void NetworkBuffer::write_prefix (char *buf, uint32_t prefix) {
       
   228     uint32_t nval = htonl(prefix);
       
   229 
       
   230     push_write((char*) &nval, sizeof(uint32_t)); 
       
   231     push_write(buf, prefix);
       
   232 }
   219 
   233 
   220 NetworkTCPTransport::NetworkTCPTransport (NetworkSocket socket) :
   234 NetworkTCPTransport::NetworkTCPTransport (NetworkSocket socket) :
   221     socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) {
   235     socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) {
   222     
   236     
   223     // use nonblocking sockets
       
   224     socket.set_nonblocking(true);
       
   225 
       
   226     // connect signals
   237     // connect signals
   227     slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read);
   238     slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read);
   228     slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write);
   239     slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write);
   229     slots.connect(socket.sig_disconnected(), this, &NetworkTCPTransport::on_disconnected);
   240     slots.connect(socket.sig_disconnected(), this, &NetworkTCPTransport::on_disconnected);
   230 }
   241 }
   254 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) {
   265 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) {
   255     uint16_t prefix = packet.get_data_size();
   266     uint16_t prefix = packet.get_data_size();
   256     
   267     
   257     if (prefix != packet.get_data_size())
   268     if (prefix != packet.get_data_size())
   258         throw CL_Error("send prefix overflow");
   269         throw CL_Error("send prefix overflow");
   259 
   270     
   260     // just write to the output buffer
   271     try {
   261     out.write_prefix<uint16_t>((char *) packet.get_buf(), prefix);
   272         // just write to the output buffer
       
   273         out.write_prefix((char *) packet.get_buf(), prefix);
       
   274 
       
   275     } catch (Error &e) {
       
   276         const char *err = e.what();
       
   277 
       
   278         Engine::log(ERROR, "tcp.write_packet") << err;
       
   279         
       
   280         throw;    
       
   281     }
   262 }
   282 }
   263 
   283 
   264 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
   284 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
   265     socket(CL_Socket::tcp, CL_Socket::ipv4) {
   285     socket(CL_Socket::tcp, CL_Socket::ipv4) {
   266     
   286     
   270     // assign slots
   290     // assign slots
   271     slots.connect(socket.sig_read_triggered(), this, &NetworkTCPServer::on_accept);
   291     slots.connect(socket.sig_read_triggered(), this, &NetworkTCPServer::on_accept);
   272 
   292 
   273     // listen
   293     // listen
   274     socket.listen(NETWORK_LISTEN_BACKLOG);
   294     socket.listen(NETWORK_LISTEN_BACKLOG);
       
   295     
       
   296     // use nonblocking sockets
       
   297     socket.set_nonblocking(true);
   275 }
   298 }
   276 
   299 
   277 
   300 
   278 void NetworkTCPServer::on_accept (void) {
   301 void NetworkTCPServer::on_accept (void) {
   279     // accept a new socket
   302     // accept a new socket
   293 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :
   316 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :
   294     NetworkTCPTransport(NetworkSocket(CL_Socket::tcp, CL_Socket::ipv4)) {
   317     NetworkTCPTransport(NetworkSocket(CL_Socket::tcp, CL_Socket::ipv4)) {
   295 
   318 
   296     // connect
   319     // connect
   297     socket.connect(connect_addr);
   320     socket.connect(connect_addr);
   298 
   321     
   299 }
   322     // use nonblocking sockets
       
   323     socket.set_nonblocking(true);
       
   324 }