--- a/src/proto2/NetworkTCP.cc Mon Nov 10 16:49:09 2008 +0000
+++ b/src/proto2/NetworkTCP.cc Mon Nov 10 18:21:23 2008 +0000
@@ -14,17 +14,140 @@
// remember size
size = size_hint;
}
+
+~NetworkBuffer::NetworkBuffer (void) {
+ free(buf);
+}
-void NetworkBuffer::resize (size_t new_size) {
- // realloc buffer
- if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
- throw CL_Error("realloc failed");
+void NetworkBuffer::resize (size_t suffix_size) {
+ size_t new_size = size;
+
+ // grow new_size until item_size fits
+ while (offset + item_size > new_size)
+ new_size *= 2;
- // update size
- size = new_size;
+ // grow if needed
+ if (new_size != size) {
+ // realloc buffer
+ if ((buf = (char *) realloc((void *) buf, new_size)) == NULL)
+ throw CL_Error("realloc failed");
+
+ // update size
+ size = new_size;
+
+ } else if (new_size > (offset + item_size) * 4) {
+ // XXX: shrink?
+ }
}
-void NetworkBuffer::write (const char *buf_ptr, size_t buf_size) {
+void NetworkBuffer::trim (size_t prefix_size) {
+ // update offset
+ offset -= prefix;
+
+ // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes
+ memmove(buf, buf + ret, offset);
+}
+
+bool try_read (size_t item_size) {
+ int ret;
+ size_t to_read = item_size;
+
+ // keept reads at at least NETWORK_CHUNK_SIZE bytes
+ if (to_read < NETWORK_TCP_CHUNK_SIZE)
+ to_read = NETWORK_TCP_CHUNK_SIZE;
+
+ // resize buffer if needed
+ resize(to_read);
+
+ // read once
+ try {
+ ret = socket.recv(buf + offset, to_read);
+
+ } catch (CL_Error &e) {
+ if (errno == EAGAIN)
+ return false;
+ else
+ throw;
+ }
+
+ // update offset
+ offset += ret;
+
+ // did we get enough?
+ if (ret < item_size)
+ return false;
+ else
+ return true;
+}
+
+bool peek_prefix (uint16_t &ref) {
+ if (offset < sizeof(uint16_t))
+ return false;
+
+ ret = ntohs(*((uint16_t *) (buf)));
+
+ return true;
+}
+
+bool peek_prefix (uint32_t &ref) {
+ if (offset < sizeof(uint32_t))
+ return false;
+
+ ret = ntohl(*((uint32_t *) (buf)));
+
+ return true;
+}
+
+template <typename PrefixType> PrefixType read_prefix (char *buf_ptr, size_t buf_max) {
+ PrefixType prefix = 0;
+ size_t missing = 0;
+
+ do {
+ // do we have the prefix?
+ if (peek_prefix(prefix)) {
+ // do we already have the payload?
+ if (offset >= sizeof(PrefixType) + prefix) {
+ break;
+
+ } else {
+ missing = (sizeof(PrefixType) + prefix) - offset;
+ }
+
+ } else {
+ missing = sizeof(PrefixType);
+ }
+
+ // sanity-check
+ assert(missing);
+
+ // try and read the missing data
+ if (try_read(missing) == false) {
+ // if unable to read what we need, return zero.
+ return 0;
+ }
+
+ // assess the situation again
+ } while (true);
+
+ // copy the data over, unless it's too large
+ if (prefix <= buf_max) {
+ memcpy(buf_ptr, buf, prefix);
+
+ // trim the bytes out
+ trim(prefix);
+
+ // return
+ return prefix;
+
+ } else {
+ // trim the bytes out
+ trim(prefix);
+
+ throw CL_Error("recv prefix overflow");
+ }
+}
+
+void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) {
int ret;
// try and short-circuit writes unless we have already buffered data
@@ -55,16 +178,9 @@
}
}
- size_t new_size = size;
+ // resize to fit buf_size more bytes
+ resize(buf_size);
- // calcluate new buffer size
- while (offset + buf_size > new_size)
- new_size *= 2;
-
- // grow internal buffer if needed
- if (new_size != size)
- resize(new_size);
-
// copy into our internal buffer
memcpy(buf + offset, buf_ptr, buf_size);
}
@@ -89,12 +205,15 @@
throw;
}
- // update offset
- offset -= ret;
+ // trim the buffer
+ trim(ret);
+}
+
+template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) {
+ push_write(&prefix, sizeof(PrefixType));
+ push_write(buf, prefix);
+}
- // shift the buffer forwards from (buf + ret) -> (buf), copying (old_offset - ret) bytes
- memmove(buf, buf + ret, offset);
-}
NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) :
socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) {
@@ -110,23 +229,14 @@
void NetworkTCPTransport::on_read (void) {
+ uint16_t prefix;
NetworkPacket packet;
-
- do {
- size_t to_read = 0;
-
- // guess how much data to receive based on either the given length prefix or our minimim chunk size
- if (in.read_prefix<uint16_t>(to_read) == false || to_read < NETWORK_TCP_CHUNK_SIZE)
- to_read = NETWORK_TCP_CHUNK_SIZE
-
- // do the recv
- if (in.recv(socket, to_read) == -1)
-
- // read out any packets
- while (in.read_prefix_packet<uint16_t>(packet)) {
- handle_packet(packet);
- }
- } while (...);
+
+ // let the in stream read length-prefixed packets and pass them on to handle_packet
+ while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size)) > 0) {
+ packet.set_data_size(prefix);
+ sig_packet(packet);
+ }
}
void NetworkTCPTransport::on_write (void) {
@@ -135,12 +245,18 @@
}
void NetworkTCPTransport::on_disconnected (void) {
-
+ // pass right through
+ sig_disconnect();
}
void NetworkTCPTransport::write_packet (const NetworkPacket &packet) {
+ uint16_t prefix = packet.get_data_size();
+
+ if (prefix != packet.get_data_size())
+ throw CL_Error("send prefix overflow");
+
// just write to the output buffer
- out.write(packet.get_buf(), packet.get_size());
+ out.write_prefix(packet.get_buf(), prefix);
}
NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) :
@@ -165,7 +281,7 @@
NetworkTCPTransport *client = new NetworkTCPTransport(client_sock);
// let our user handle it
- handle_client(client);
+ sig_client(client);
}
NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) :