src/proto2/NetworkTCP.cc
branchno-netsession
changeset 32 2ff929186c90
parent 31 d0d7489d4e8b
child 33 e53f09b378f4
equal deleted inserted replaced
31:d0d7489d4e8b 32:2ff929186c90
    12        throw CL_Error("malloc failed");
    12        throw CL_Error("malloc failed");
    13     
    13     
    14     // remember size
    14     // remember size
    15     size = size_hint;
    15     size = size_hint;
    16 }
    16 }
    17 
    17         
    18 void NetworkBuffer::resize (size_t new_size) {
    18 ~NetworkBuffer::NetworkBuffer (void) {
    19     // realloc buffer
    19     free(buf);
    20     if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
    20 }
    21         throw CL_Error("realloc failed");
    21 
    22     
    22 void NetworkBuffer::resize (size_t suffix_size) {
    23     // update size
    23     size_t new_size = size;
    24     size = new_size;
    24 
    25 }
    25     // grow new_size until item_size fits
    26         
    26     while (offset + item_size > new_size)
    27 void NetworkBuffer::write (const char *buf_ptr, size_t buf_size) {
    27         new_size *= 2;
       
    28     
       
    29     // grow if needed
       
    30     if (new_size != size) {
       
    31         // realloc buffer
       
    32         if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
       
    33             throw CL_Error("realloc failed");
       
    34 
       
    35         // update size
       
    36         size = new_size;
       
    37 
       
    38     } else if (new_size > (offset + item_size) * 4) {
       
    39         // XXX: shrink?
       
    40     }
       
    41 }
       
    42         
       
    43 void NetworkBuffer::trim (size_t prefix_size) {
       
    44     // update offset
       
    45     offset -= prefix;
       
    46 
       
    47     // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes
       
    48     memmove(buf, buf + ret, offset);
       
    49 }
       
    50      
       
    51 bool try_read (size_t item_size) {
       
    52     int ret;
       
    53     size_t to_read = item_size;
       
    54 
       
    55     // keept reads at at least NETWORK_CHUNK_SIZE bytes
       
    56     if (to_read < NETWORK_TCP_CHUNK_SIZE)
       
    57         to_read = NETWORK_TCP_CHUNK_SIZE;
       
    58 
       
    59     // resize buffer if needed
       
    60     resize(to_read);
       
    61 
       
    62     // read once
       
    63     try {
       
    64         ret = socket.recv(buf + offset, to_read);
       
    65 
       
    66     } catch (CL_Error &e) {
       
    67         if (errno == EAGAIN)
       
    68             return false;
       
    69         else
       
    70             throw;
       
    71     }
       
    72 
       
    73     // update offset
       
    74     offset += ret;
       
    75 
       
    76     // did we get enough?
       
    77     if (ret < item_size)
       
    78         return false;
       
    79     else
       
    80         return true;
       
    81 } 
       
    82         
       
    83 bool peek_prefix (uint16_t &ref) {
       
    84     if (offset < sizeof(uint16_t))
       
    85         return false;
       
    86 
       
    87     ret = ntohs(*((uint16_t *) (buf)));
       
    88 
       
    89     return true;
       
    90 }
       
    91     
       
    92 bool peek_prefix (uint32_t &ref) {
       
    93     if (offset < sizeof(uint32_t))
       
    94         return false;
       
    95 
       
    96     ret = ntohl(*((uint32_t *) (buf)));
       
    97 
       
    98     return true;
       
    99 }
       
   100 
       
   101 template <typename PrefixType> PrefixType read_prefix (char *buf_ptr, size_t buf_max) {
       
   102     PrefixType prefix = 0;
       
   103     size_t missing = 0;
       
   104     
       
   105     do {    
       
   106         // do we have the prefix?
       
   107         if (peek_prefix(prefix)) {
       
   108             // do we already have the payload?
       
   109             if (offset >= sizeof(PrefixType) + prefix) {
       
   110                 break;
       
   111 
       
   112             } else {
       
   113                 missing = (sizeof(PrefixType) + prefix) - offset;
       
   114             }
       
   115 
       
   116         } else {
       
   117             missing = sizeof(PrefixType);
       
   118         }
       
   119 
       
   120         // sanity-check
       
   121         assert(missing);
       
   122         
       
   123         // try and read the missing data
       
   124         if (try_read(missing) == false) {
       
   125             // if unable to read what we need, return zero.
       
   126             return 0;
       
   127         }
       
   128         
       
   129         // assess the situation again
       
   130     } while (true);
       
   131     
       
   132     // copy the data over, unless it's too large
       
   133     if (prefix <= buf_max) {
       
   134         memcpy(buf_ptr, buf, prefix);
       
   135     
       
   136         // trim the bytes out
       
   137         trim(prefix);
       
   138         
       
   139         // return
       
   140         return prefix;
       
   141 
       
   142     } else {
       
   143         // trim the bytes out
       
   144         trim(prefix);
       
   145         
       
   146         throw CL_Error("recv prefix overflow");   
       
   147     }
       
   148 }
       
   149    
       
   150 void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) {
    28     int ret;
   151     int ret;
    29 
   152 
    30     // try and short-circuit writes unless we have already buffered data
   153     // try and short-circuit writes unless we have already buffered data
    31     if (offset == 0) {
   154     if (offset == 0) {
    32         try {
   155         try {
    53             if (buf_size == 0)
   176             if (buf_size == 0)
    54                 return;
   177                 return;
    55         }
   178         }
    56     }
   179     }
    57     
   180     
    58     size_t new_size = size;
   181     // resize to fit buf_size more bytes
    59     
   182     resize(buf_size);
    60     // calcluate new buffer size
   183     
    61     while (offset + buf_size > new_size)
       
    62         new_size *= 2;
       
    63     
       
    64     // grow internal buffer if needed
       
    65     if (new_size != size)
       
    66         resize(new_size);
       
    67 
       
    68     // copy into our internal buffer
   184     // copy into our internal buffer
    69     memcpy(buf + offset, buf_ptr, buf_size);
   185     memcpy(buf + offset, buf_ptr, buf_size);
    70 }
   186 }
    71         
   187         
    72 void NetworkBuffer::flush_write (void) {
   188 void NetworkBuffer::flush_write (void) {
    87 
   203 
    88         else
   204         else
    89             throw;
   205             throw;
    90     }
   206     }
    91 
   207 
    92     // update offset
   208     // trim the buffer
    93     offset -= ret;
   209     trim(ret);
    94 
   210 }
    95     // shift the buffer forwards from (buf + ret) -> (buf), copying (old_offset - ret) bytes
   211         
    96     memmove(buf, buf + ret, offset);
   212 template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) {
    97 }
   213     push_write(&prefix, sizeof(PrefixType)); 
       
   214     push_write(buf, prefix);
       
   215 }
       
   216 
    98 
   217 
    99 NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) :
   218 NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) :
   100     socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) {
   219     socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) {
   101     
   220     
   102     // use nonblocking sockets
   221     // use nonblocking sockets
   108     slots.connect(socket.sig_disconnected_triggered(), this, &NetworkTCPTransport::on_disconnected);
   227     slots.connect(socket.sig_disconnected_triggered(), this, &NetworkTCPTransport::on_disconnected);
   109 }
   228 }
   110 
   229 
   111 
   230 
   112 void NetworkTCPTransport::on_read (void) {
   231 void NetworkTCPTransport::on_read (void) {
       
   232     uint16_t prefix;
   113     NetworkPacket packet;
   233     NetworkPacket packet;
   114 
   234     
   115     do {
   235     // let the in stream read length-prefixed packets and pass them on to handle_packet
   116         size_t to_read = 0;
   236     while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size)) > 0) {
   117 
   237         packet.set_data_size(prefix);
   118         // guess how much data to receive based on either the given length prefix or our minimim chunk size
   238         sig_packet(packet);
   119         if (in.read_prefix<uint16_t>(to_read) == false || to_read < NETWORK_TCP_CHUNK_SIZE)
   239     }
   120             to_read = NETWORK_TCP_CHUNK_SIZE 
       
   121         
       
   122         // do the recv
       
   123         if (in.recv(socket, to_read) == -1)
       
   124         
       
   125         // read out any packets
       
   126         while (in.read_prefix_packet<uint16_t>(packet)) {
       
   127             handle_packet(packet);
       
   128         }
       
   129     } while (...);
       
   130 }
   240 }
   131 
   241 
   132 void NetworkTCPTransport::on_write (void) {
   242 void NetworkTCPTransport::on_write (void) {
   133     // just flush the output buffer
   243     // just flush the output buffer
   134     out.flush_write();
   244     out.flush_write();
   135 }
   245 }
   136 
   246 
   137 void NetworkTCPTransport::on_disconnected (void) {
   247 void NetworkTCPTransport::on_disconnected (void) {
   138 
   248     // pass right through
       
   249     sig_disconnect();
   139 }
   250 }
   140         
   251         
   141 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) {
   252 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) {
       
   253     uint16_t prefix = packet.get_data_size();
       
   254     
       
   255     if (prefix != packet.get_data_size())
       
   256         throw CL_Error("send prefix overflow");
       
   257 
   142     // just write to the output buffer
   258     // just write to the output buffer
   143     out.write(packet.get_buf(), packet.get_size());
   259     out.write_prefix(packet.get_buf(), prefix);
   144 }
   260 }
   145 
   261 
   146 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
   262 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
   147     socket(tcp, ipv4) {
   263     socket(tcp, ipv4) {
   148     
   264     
   163 
   279 
   164     // create a new NetworkTCPTransport
   280     // create a new NetworkTCPTransport
   165     NetworkTCPTransport *client = new NetworkTCPTransport(client_sock);
   281     NetworkTCPTransport *client = new NetworkTCPTransport(client_sock);
   166 
   282 
   167     // let our user handle it
   283     // let our user handle it
   168     handle_client(client);
   284     sig_client(client);
   169 }
   285 }
   170         
   286         
   171 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :
   287 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :
   172     NetworkTCPTransport(NetworkSocket(tcp, ipv4)) {
   288     NetworkTCPTransport(NetworkSocket(tcp, ipv4)) {
   173 
   289