terom@31: terom@31: #include "NetworkTCP.hh" terom@31: terom@31: #include terom@31: #include terom@31: terom@31: NetworkBuffer::NetworkBuffer (NetworkSocket &socket, size_t size_hint) : terom@31: socket(socket), buf(0), size(0), offset(0) { terom@31: terom@31: // allocate initial buffer terom@31: if ((buf = (char *) malloc(size_hint)) == NULL) terom@31: throw CL_Error("malloc failed"); terom@31: terom@31: // remember size terom@31: size = size_hint; terom@31: } terom@32: terom@32: ~NetworkBuffer::NetworkBuffer (void) { terom@32: free(buf); terom@32: } terom@31: terom@32: void NetworkBuffer::resize (size_t suffix_size) { terom@32: size_t new_size = size; terom@32: terom@32: // grow new_size until item_size fits terom@32: while (offset + item_size > new_size) terom@32: new_size *= 2; terom@31: terom@32: // grow if needed terom@32: if (new_size != size) { terom@32: // realloc buffer terom@32: if ((buf = (char *) realloc((void *) buf, new_size)) == NULL) terom@32: throw CL_Error("realloc failed"); terom@32: terom@32: // update size terom@32: size = new_size; terom@32: terom@32: } else if (new_size > (offset + item_size) * 4) { terom@32: // XXX: shrink? terom@32: } terom@31: } terom@31: terom@32: void NetworkBuffer::trim (size_t prefix_size) { terom@32: // update offset terom@32: offset -= prefix; terom@32: terom@32: // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes terom@32: memmove(buf, buf + ret, offset); terom@32: } terom@32: terom@32: bool try_read (size_t item_size) { terom@32: int ret; terom@32: size_t to_read = item_size; terom@32: terom@32: // keept reads at at least NETWORK_CHUNK_SIZE bytes terom@32: if (to_read < NETWORK_TCP_CHUNK_SIZE) terom@32: to_read = NETWORK_TCP_CHUNK_SIZE; terom@32: terom@32: // resize buffer if needed terom@32: resize(to_read); terom@32: terom@32: // read once terom@32: try { terom@32: ret = socket.recv(buf + offset, to_read); terom@32: terom@32: } catch (CL_Error &e) { terom@32: if (errno == EAGAIN) terom@32: return false; terom@32: else terom@32: throw; terom@32: } terom@32: terom@32: // update offset terom@32: offset += ret; terom@32: terom@32: // did we get enough? terom@32: if (ret < item_size) terom@32: return false; terom@32: else terom@32: return true; terom@32: } terom@32: terom@32: bool peek_prefix (uint16_t &ref) { terom@32: if (offset < sizeof(uint16_t)) terom@32: return false; terom@32: terom@32: ret = ntohs(*((uint16_t *) (buf))); terom@32: terom@32: return true; terom@32: } terom@32: terom@32: bool peek_prefix (uint32_t &ref) { terom@32: if (offset < sizeof(uint32_t)) terom@32: return false; terom@32: terom@32: ret = ntohl(*((uint32_t *) (buf))); terom@32: terom@32: return true; terom@32: } terom@32: terom@32: template PrefixType read_prefix (char *buf_ptr, size_t buf_max) { terom@32: PrefixType prefix = 0; terom@32: size_t missing = 0; terom@32: terom@32: do { terom@32: // do we have the prefix? terom@32: if (peek_prefix(prefix)) { terom@32: // do we already have the payload? terom@32: if (offset >= sizeof(PrefixType) + prefix) { terom@32: break; terom@32: terom@32: } else { terom@32: missing = (sizeof(PrefixType) + prefix) - offset; terom@32: } terom@32: terom@32: } else { terom@32: missing = sizeof(PrefixType); terom@32: } terom@32: terom@32: // sanity-check terom@32: assert(missing); terom@32: terom@32: // try and read the missing data terom@32: if (try_read(missing) == false) { terom@32: // if unable to read what we need, return zero. terom@32: return 0; terom@32: } terom@32: terom@32: // assess the situation again terom@32: } while (true); terom@32: terom@32: // copy the data over, unless it's too large terom@32: if (prefix <= buf_max) { terom@32: memcpy(buf_ptr, buf, prefix); terom@32: terom@32: // trim the bytes out terom@32: trim(prefix); terom@32: terom@32: // return terom@32: return prefix; terom@32: terom@32: } else { terom@32: // trim the bytes out terom@32: trim(prefix); terom@32: terom@32: throw CL_Error("recv prefix overflow"); terom@32: } terom@32: } terom@32: terom@32: void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) { terom@31: int ret; terom@31: terom@31: // try and short-circuit writes unless we have already buffered data terom@31: if (offset == 0) { terom@31: try { terom@31: // attempt to send something terom@31: ret = socket.send(buf_ptr, buf_size); terom@31: terom@31: } catch (CL_Error &e) { terom@31: // ignore EAGAIN, detect this by setting ret to -1 terom@31: if (errno != EAGAIN) terom@31: throw; terom@31: terom@31: ret = -1; terom@31: } terom@31: terom@31: // if we managed to send something, adjust buf/size and buffer terom@31: if (ret > 0) { terom@31: buf_ptr += ret; terom@31: buf_size -= ret; terom@31: terom@31: // sanity-check terom@31: assert(buf_size >= 0); terom@31: terom@31: // if that was all, we're done terom@31: if (buf_size == 0) terom@31: return; terom@31: } terom@31: } terom@31: terom@32: // resize to fit buf_size more bytes terom@32: resize(buf_size); terom@31: terom@31: // copy into our internal buffer terom@31: memcpy(buf + offset, buf_ptr, buf_size); terom@31: } terom@31: terom@31: void NetworkBuffer::flush_write (void) { terom@31: int ret; terom@31: terom@31: // ignore if we don't have any data buffered terom@31: if (offset == 0) terom@31: return; terom@31: terom@31: // attempt to write as much as possible terom@31: try { terom@31: ret = socket.send(buf, offset) terom@31: terom@31: } catch (CL_Error &e) { terom@31: // ignore EAGAIN and just return terom@31: if (errno == EAGAIN) terom@31: return; terom@31: terom@31: else terom@31: throw; terom@31: } terom@31: terom@32: // trim the buffer terom@32: trim(ret); terom@32: } terom@32: terom@32: template void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) { terom@32: push_write(&prefix, sizeof(PrefixType)); terom@32: push_write(buf, prefix); terom@32: } terom@31: terom@31: terom@31: NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) : terom@31: socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) { terom@31: terom@31: // use nonblocking sockets terom@31: socket.set_nonblocking(true); terom@31: terom@31: // connect signals terom@31: slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read); terom@31: slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write); terom@31: slots.connect(socket.sig_disconnected_triggered(), this, &NetworkTCPTransport::on_disconnected); terom@31: } terom@31: terom@31: terom@31: void NetworkTCPTransport::on_read (void) { terom@32: uint16_t prefix; terom@31: NetworkPacket packet; terom@32: terom@32: // let the in stream read length-prefixed packets and pass them on to handle_packet terom@32: while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size)) > 0) { terom@32: packet.set_data_size(prefix); terom@34: _sig_packet(packet); terom@32: } terom@31: } terom@31: terom@31: void NetworkTCPTransport::on_write (void) { terom@31: // just flush the output buffer terom@31: out.flush_write(); terom@31: } terom@31: terom@31: void NetworkTCPTransport::on_disconnected (void) { terom@32: // pass right through terom@34: _sig_disconnect(); terom@31: } terom@31: terom@31: void NetworkTCPTransport::write_packet (const NetworkPacket &packet) { terom@32: uint16_t prefix = packet.get_data_size(); terom@32: terom@32: if (prefix != packet.get_data_size()) terom@32: throw CL_Error("send prefix overflow"); terom@32: terom@31: // just write to the output buffer terom@32: out.write_prefix(packet.get_buf(), prefix); terom@31: } terom@31: terom@31: NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : terom@31: socket(tcp, ipv4) { terom@31: terom@31: // bind terom@31: socket.bind(listen_addr); terom@31: terom@31: // assign slots terom@31: slots.connect(socket.sig_read_triggered(), this, &NetworkTCPServer::on_accept); terom@31: terom@31: // listen terom@31: socket.listen(NETWORK_LISTEN_BACKLOG); terom@31: } terom@31: terom@31: terom@31: void NetworkTCPServer::on_accept (void) { terom@31: // accept a new socket terom@31: NetworkSocket client_sock = socket.accept(); terom@31: terom@31: // create a new NetworkTCPTransport terom@33: NetworkTCPTransport *client = buildTransport(client_sock); terom@33: terom@31: // let our user handle it terom@34: _sig_client(client); terom@31: } terom@31: terom@33: virtual NetworkTCPTransport NetworkTCPServer::buildTransport (CL_Socket &socket) { terom@33: return new NetworkTCPTransport(client_sock); terom@33: } terom@33: terom@31: NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : terom@31: NetworkTCPTransport(NetworkSocket(tcp, ipv4)) { terom@31: terom@31: // connect terom@31: socket.connect(connect_addr); terom@31: terom@31: }