src/proto2/NetworkTCP.cc
branchno-netsession
changeset 32 2ff929186c90
parent 31 d0d7489d4e8b
child 33 e53f09b378f4
--- a/src/proto2/NetworkTCP.cc	Mon Nov 10 16:49:09 2008 +0000
+++ b/src/proto2/NetworkTCP.cc	Mon Nov 10 18:21:23 2008 +0000
@@ -14,17 +14,140 @@
     // remember size
     size = size_hint;
 }
+        
+~NetworkBuffer::NetworkBuffer (void) {
+    free(buf);
+}
 
-void NetworkBuffer::resize (size_t new_size) {
-    // realloc buffer
-    if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
-        throw CL_Error("realloc failed");
+void NetworkBuffer::resize (size_t suffix_size) {
+    size_t new_size = size;
+
+    // grow new_size until item_size fits
+    while (offset + item_size > new_size)
+        new_size *= 2;
     
-    // update size
-    size = new_size;
+    // grow if needed
+    if (new_size != size) {
+        // realloc buffer
+        if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
+            throw CL_Error("realloc failed");
+
+        // update size
+        size = new_size;
+
+    } else if (new_size > (offset + item_size) * 4) {
+        // XXX: shrink?
+    }
 }
         
-void NetworkBuffer::write (const char *buf_ptr, size_t buf_size) {
+void NetworkBuffer::trim (size_t prefix_size) {
+    // update offset
+    offset -= prefix;
+
+    // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes
+    memmove(buf, buf + ret, offset);
+}
+     
+bool try_read (size_t item_size) {
+    int ret;
+    size_t to_read = item_size;
+
+    // keept reads at at least NETWORK_CHUNK_SIZE bytes
+    if (to_read < NETWORK_TCP_CHUNK_SIZE)
+        to_read = NETWORK_TCP_CHUNK_SIZE;
+
+    // resize buffer if needed
+    resize(to_read);
+
+    // read once
+    try {
+        ret = socket.recv(buf + offset, to_read);
+
+    } catch (CL_Error &e) {
+        if (errno == EAGAIN)
+            return false;
+        else
+            throw;
+    }
+
+    // update offset
+    offset += ret;
+
+    // did we get enough?
+    if (ret < item_size)
+        return false;
+    else
+        return true;
+} 
+        
+bool peek_prefix (uint16_t &ref) {
+    if (offset < sizeof(uint16_t))
+        return false;
+
+    ret = ntohs(*((uint16_t *) (buf)));
+
+    return true;
+}
+    
+bool peek_prefix (uint32_t &ref) {
+    if (offset < sizeof(uint32_t))
+        return false;
+
+    ret = ntohl(*((uint32_t *) (buf)));
+
+    return true;
+}
+
+template <typename PrefixType> PrefixType read_prefix (char *buf_ptr, size_t buf_max) {
+    PrefixType prefix = 0;
+    size_t missing = 0;
+    
+    do {    
+        // do we have the prefix?
+        if (peek_prefix(prefix)) {
+            // do we already have the payload?
+            if (offset >= sizeof(PrefixType) + prefix) {
+                break;
+
+            } else {
+                missing = (sizeof(PrefixType) + prefix) - offset;
+            }
+
+        } else {
+            missing = sizeof(PrefixType);
+        }
+
+        // sanity-check
+        assert(missing);
+        
+        // try and read the missing data
+        if (try_read(missing) == false) {
+            // if unable to read what we need, return zero.
+            return 0;
+        }
+        
+        // assess the situation again
+    } while (true);
+    
+    // copy the data over, unless it's too large
+    if (prefix <= buf_max) {
+        memcpy(buf_ptr, buf, prefix);
+    
+        // trim the bytes out
+        trim(prefix);
+        
+        // return
+        return prefix;
+
+    } else {
+        // trim the bytes out
+        trim(prefix);
+        
+        throw CL_Error("recv prefix overflow");   
+    }
+}
+   
+void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) {
     int ret;
 
     // try and short-circuit writes unless we have already buffered data
@@ -55,16 +178,9 @@
         }
     }
     
-    size_t new_size = size;
+    // resize to fit buf_size more bytes
+    resize(buf_size);
     
-    // calcluate new buffer size
-    while (offset + buf_size > new_size)
-        new_size *= 2;
-    
-    // grow internal buffer if needed
-    if (new_size != size)
-        resize(new_size);
-
     // copy into our internal buffer
     memcpy(buf + offset, buf_ptr, buf_size);
 }
@@ -89,12 +205,15 @@
             throw;
     }
 
-    // update offset
-    offset -= ret;
+    // trim the buffer
+    trim(ret);
+}
+        
+template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) {
+    push_write(&prefix, sizeof(PrefixType)); 
+    push_write(buf, prefix);
+}
 
-    // shift the buffer forwards from (buf + ret) -> (buf), copying (old_offset - ret) bytes
-    memmove(buf, buf + ret, offset);
-}
 
 NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) :
     socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) {
@@ -110,23 +229,14 @@
 
 
 void NetworkTCPTransport::on_read (void) {
+    uint16_t prefix;
     NetworkPacket packet;
-
-    do {
-        size_t to_read = 0;
-
-        // guess how much data to receive based on either the given length prefix or our minimim chunk size
-        if (in.read_prefix<uint16_t>(to_read) == false || to_read < NETWORK_TCP_CHUNK_SIZE)
-            to_read = NETWORK_TCP_CHUNK_SIZE 
-        
-        // do the recv
-        if (in.recv(socket, to_read) == -1)
-        
-        // read out any packets
-        while (in.read_prefix_packet<uint16_t>(packet)) {
-            handle_packet(packet);
-        }
-    } while (...);
+    
+    // let the in stream read length-prefixed packets and pass them on to handle_packet
+    while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size)) > 0) {
+        packet.set_data_size(prefix);
+        sig_packet(packet);
+    }
 }
 
 void NetworkTCPTransport::on_write (void) {
@@ -135,12 +245,18 @@
 }
 
 void NetworkTCPTransport::on_disconnected (void) {
-
+    // pass right through
+    sig_disconnect();
 }
         
 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) {
+    uint16_t prefix = packet.get_data_size();
+    
+    if (prefix != packet.get_data_size())
+        throw CL_Error("send prefix overflow");
+
     // just write to the output buffer
-    out.write(packet.get_buf(), packet.get_size());
+    out.write_prefix(packet.get_buf(), prefix);
 }
 
 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
@@ -165,7 +281,7 @@
     NetworkTCPTransport *client = new NetworkTCPTransport(client_sock);
 
     // let our user handle it
-    handle_client(client);
+    sig_client(client);
 }
         
 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :