src/proto2/NetworkTCP.cc
branchno-netsession
changeset 37 ed2957490bbf
parent 34 1ea6554d703e
child 38 4189b8bf3a5b
equal deleted inserted replaced
36:785d220fc6b7 37:ed2957490bbf
     7 NetworkBuffer::NetworkBuffer (NetworkSocket &socket, size_t size_hint) :
     7 NetworkBuffer::NetworkBuffer (NetworkSocket &socket, size_t size_hint) :
     8     socket(socket), buf(0), size(0), offset(0) {
     8     socket(socket), buf(0), size(0), offset(0) {
     9     
     9     
    10     // allocate initial buffer
    10     // allocate initial buffer
    11     if ((buf = (char *) malloc(size_hint)) == NULL)
    11     if ((buf = (char *) malloc(size_hint)) == NULL)
    12        throw CL_Error("malloc failed");
    12        throw NetworkBufferError("malloc failed");
    13     
    13     
    14     // remember size
    14     // remember size
    15     size = size_hint;
    15     size = size_hint;
    16 }
    16 }
    17         
    17         
    18 ~NetworkBuffer::NetworkBuffer (void) {
    18 NetworkBuffer::~NetworkBuffer (void) {
    19     free(buf);
    19     free(buf);
    20 }
    20 }
    21 
    21 
    22 void NetworkBuffer::resize (size_t suffix_size) {
    22 void NetworkBuffer::resize (size_t item_size) {
    23     size_t new_size = size;
    23     size_t new_size = size;
    24 
    24 
    25     // grow new_size until item_size fits
    25     // grow new_size until item_size fits
    26     while (offset + item_size > new_size)
    26     while (offset + item_size > new_size)
    27         new_size *= 2;
    27         new_size *= 2;
    28     
    28     
    29     // grow if needed
    29     // grow if needed
    30     if (new_size != size) {
    30     if (new_size != size) {
    31         // realloc buffer
    31         // realloc buffer
    32         if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
    32         if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
    33             throw CL_Error("realloc failed");
    33             throw NetworkBufferError("realloc failed");
    34 
    34 
    35         // update size
    35         // update size
    36         size = new_size;
    36         size = new_size;
    37 
    37 
    38     } else if (new_size > (offset + item_size) * 4) {
    38     } else if (new_size > (offset + item_size) * 4) {
    40     }
    40     }
    41 }
    41 }
    42         
    42         
    43 void NetworkBuffer::trim (size_t prefix_size) {
    43 void NetworkBuffer::trim (size_t prefix_size) {
    44     // update offset
    44     // update offset
    45     offset -= prefix;
    45     offset -= prefix_size;
    46 
    46 
    47     // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes
    47     // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes
    48     memmove(buf, buf + ret, offset);
    48     memmove(buf, buf + prefix_size, offset);
    49 }
    49 }
    50      
    50      
    51 bool try_read (size_t item_size) {
    51 bool NetworkBuffer::try_read (size_t item_size) {
    52     int ret;
    52     int ret;
    53     size_t to_read = item_size;
    53     size_t to_read = item_size;
    54 
    54 
    55     // keept reads at at least NETWORK_CHUNK_SIZE bytes
    55     // keept reads at at least NETWORK_CHUNK_SIZE bytes
    56     if (to_read < NETWORK_TCP_CHUNK_SIZE)
    56     if (to_read < NETWORK_TCP_CHUNK_SIZE)
    68             return false;
    68             return false;
    69         else
    69         else
    70             throw;
    70             throw;
    71     }
    71     }
    72 
    72 
       
    73     assert(ret > 0);
       
    74 
    73     // update offset
    75     // update offset
    74     offset += ret;
    76     offset += ret;
    75 
    77 
    76     // did we get enough?
    78     // did we get enough?
    77     if (ret < item_size)
    79     if ((unsigned int) ret < item_size)
    78         return false;
    80         return false;
    79     else
    81     else
    80         return true;
    82         return true;
    81 } 
    83 } 
    82         
    84         
    83 bool peek_prefix (uint16_t &ref) {
    85 bool NetworkBuffer::peek_prefix (uint16_t &ref) {
    84     if (offset < sizeof(uint16_t))
    86     if (offset < sizeof(uint16_t))
    85         return false;
    87         return false;
    86 
    88 
    87     ret = ntohs(*((uint16_t *) (buf)));
    89     ref = ntohs(*((uint16_t *) (buf)));
    88 
    90 
    89     return true;
    91     return true;
    90 }
    92 }
    91     
    93     
    92 bool peek_prefix (uint32_t &ref) {
    94 bool NetworkBuffer::peek_prefix (uint32_t &ref) {
    93     if (offset < sizeof(uint32_t))
    95     if (offset < sizeof(uint32_t))
    94         return false;
    96         return false;
    95 
    97 
    96     ret = ntohl(*((uint32_t *) (buf)));
    98     ref = ntohl(*((uint32_t *) (buf)));
    97 
    99 
    98     return true;
   100     return true;
    99 }
   101 }
   100 
   102 
   101 template <typename PrefixType> PrefixType read_prefix (char *buf_ptr, size_t buf_max) {
   103 template <typename PrefixType> PrefixType NetworkBuffer::read_prefix (char *buf_ptr, size_t buf_max) {
   102     PrefixType prefix = 0;
   104     PrefixType prefix = 0;
   103     size_t missing = 0;
   105     size_t missing = 0;
   104     
   106     
   105     do {    
   107     do {    
   106         // do we have the prefix?
   108         // do we have the prefix?
   141 
   143 
   142     } else {
   144     } else {
   143         // trim the bytes out
   145         // trim the bytes out
   144         trim(prefix);
   146         trim(prefix);
   145         
   147         
   146         throw CL_Error("recv prefix overflow");   
   148         throw NetworkBufferError("recv prefix overflow");   
   147     }
   149     }
   148 }
   150 }
   149    
   151    
   150 void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) {
   152 void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) {
   151     int ret;
   153     int ret;
   164             ret = -1;
   166             ret = -1;
   165         }
   167         }
   166         
   168         
   167         // if we managed to send something, adjust buf/size and buffer
   169         // if we managed to send something, adjust buf/size and buffer
   168         if (ret > 0) {
   170         if (ret > 0) {
       
   171             // sanity-check
       
   172             assert(buf_size >= (unsigned int) ret);
       
   173 
   169             buf_ptr += ret;
   174             buf_ptr += ret;
   170             buf_size -= ret;
   175             buf_size -= ret;
   171 
       
   172             // sanity-check
       
   173             assert(buf_size >= 0);
       
   174 
   176 
   175             // if that was all, we're done
   177             // if that was all, we're done
   176             if (buf_size == 0)
   178             if (buf_size == 0)
   177                 return;
   179                 return;
   178         }
   180         }
   192     if (offset == 0)
   194     if (offset == 0)
   193         return;
   195         return;
   194     
   196     
   195     // attempt to write as much as possible
   197     // attempt to write as much as possible
   196     try {
   198     try {
   197         ret = socket.send(buf, offset)
   199         ret = socket.send(buf, offset);
   198 
   200 
   199     } catch (CL_Error &e) {
   201     } catch (CL_Error &e) {
   200         // ignore EAGAIN and just return
   202         // ignore EAGAIN and just return
   201         if (errno == EAGAIN)
   203         if (errno == EAGAIN)
   202             return;
   204             return;
   208     // trim the buffer
   210     // trim the buffer
   209     trim(ret);
   211     trim(ret);
   210 }
   212 }
   211         
   213         
   212 template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) {
   214 template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) {
   213     push_write(&prefix, sizeof(PrefixType)); 
   215     push_write((char*) &prefix, sizeof(PrefixType)); 
   214     push_write(buf, prefix);
   216     push_write(buf, prefix);
   215 }
   217 }
   216 
   218 
   217 
   219 
   218 NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) :
   220 NetworkTCPTransport::NetworkTCPTransport (NetworkSocket socket) :
   219     socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) {
   221     socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) {
   220     
   222     
   221     // use nonblocking sockets
   223     // use nonblocking sockets
   222     socket.set_nonblocking(true);
   224     socket.set_nonblocking(true);
   223 
   225 
   224     // connect signals
   226     // connect signals
   225     slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read);
   227     slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read);
   226     slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write);
   228     slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write);
   227     slots.connect(socket.sig_disconnected_triggered(), this, &NetworkTCPTransport::on_disconnected);
   229     slots.connect(socket.sig_disconnected(), this, &NetworkTCPTransport::on_disconnected);
   228 }
   230 }
   229 
   231 
   230 
   232 
   231 void NetworkTCPTransport::on_read (void) {
   233 void NetworkTCPTransport::on_read (void) {
   232     uint16_t prefix;
   234     uint16_t prefix;
   233     NetworkPacket packet;
   235     NetworkPacket packet;
   234     
   236     
   235     // let the in stream read length-prefixed packets and pass them on to handle_packet
   237     // let the in stream read length-prefixed packets and pass them on to handle_packet
   236     while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size)) > 0) {
   238     while ((prefix = in.read_prefix<uint16_t>(packet.get_buf(), packet.get_buf_size())) > 0) {
   237         packet.set_data_size(prefix);
   239         packet.set_data_size(prefix);
   238         _sig_packet(packet);
   240         _sig_packet(packet);
   239     }
   241     }
   240 }
   242 }
   241 
   243 
   254     
   256     
   255     if (prefix != packet.get_data_size())
   257     if (prefix != packet.get_data_size())
   256         throw CL_Error("send prefix overflow");
   258         throw CL_Error("send prefix overflow");
   257 
   259 
   258     // just write to the output buffer
   260     // just write to the output buffer
   259     out.write_prefix(packet.get_buf(), prefix);
   261     out.write_prefix<uint16_t>((char *) packet.get_buf(), prefix);
   260 }
   262 }
   261 
   263 
   262 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
   264 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
   263     socket(tcp, ipv4) {
   265     socket(CL_Socket::tcp, CL_Socket::ipv4) {
   264     
   266     
   265     // bind
   267     // bind
   266     socket.bind(listen_addr);
   268     socket.bind(listen_addr);
   267 
   269 
   268     // assign slots
   270     // assign slots
   282         
   284         
   283     // let our user handle it
   285     // let our user handle it
   284     _sig_client(client);
   286     _sig_client(client);
   285 }
   287 }
   286         
   288         
   287 virtual NetworkTCPTransport NetworkTCPServer::buildTransport (CL_Socket &socket) {
   289 NetworkTCPTransport* NetworkTCPServer::buildTransport (CL_Socket &socket) {
   288     return new NetworkTCPTransport(client_sock);
   290     return new NetworkTCPTransport(socket);
   289 }
   291 }
   290         
   292         
   291 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :
   293 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :
   292     NetworkTCPTransport(NetworkSocket(tcp, ipv4)) {
   294     NetworkTCPTransport(NetworkSocket(CL_Socket::tcp, CL_Socket::ipv4)) {
   293 
   295 
   294     // connect
   296     // connect
   295     socket.connect(connect_addr);
   297     socket.connect(connect_addr);
   296 
   298 
   297 }
   299 }