terom@89: terom@89: #include "NetworkTCP.hh" terom@89: #include "Engine.hh" terom@89: terom@89: #include terom@89: #include terom@89: terom@89: NetworkBuffer::NetworkBuffer (NetworkSocket &socket, size_t size_hint) : terom@89: socket(socket), buf(0), size(0), offset(0) { terom@89: terom@89: // allocate initial buffer terom@89: if ((buf = (char *) malloc(size_hint)) == NULL) terom@89: throw NetworkBufferError("malloc failed"); terom@89: terom@89: // remember size terom@89: size = size_hint; terom@89: } terom@89: terom@89: NetworkBuffer::~NetworkBuffer (void) { terom@89: free(buf); terom@89: } terom@89: terom@89: void NetworkBuffer::resize (size_t item_size) { terom@89: size_t new_size = size; terom@89: terom@89: // grow new_size until item_size fits terom@89: while (offset + item_size > new_size) terom@89: new_size *= 2; terom@89: terom@89: // grow if needed terom@89: if (new_size != size) { terom@89: // realloc buffer terom@89: if ((buf = (char *) realloc((void *) buf, new_size)) == NULL) terom@89: throw NetworkBufferError("realloc failed"); terom@89: terom@89: // update size terom@89: size = new_size; terom@89: terom@89: } else if (new_size > (offset + item_size) * 4) { terom@89: // XXX: shrink? terom@89: } terom@89: } terom@89: terom@89: void NetworkBuffer::trim (size_t prefix_size) { terom@89: // update offset terom@89: offset -= prefix_size; terom@89: terom@89: // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes terom@89: memmove(buf, buf + prefix_size, offset); terom@89: } terom@89: terom@89: bool NetworkBuffer::try_read (size_t item_size) { terom@89: int ret; terom@89: size_t to_read = item_size; terom@89: terom@89: // keept reads at at least NETWORK_CHUNK_SIZE bytes terom@89: if (to_read < NETWORK_TCP_CHUNK_SIZE) terom@89: to_read = NETWORK_TCP_CHUNK_SIZE; terom@89: terom@89: // resize buffer if needed terom@89: resize(to_read); terom@89: terom@89: // read once terom@89: try { terom@89: ret = socket.recv(buf + offset, to_read); terom@89: terom@89: } catch (CL_Error &e) { terom@89: if (errno == EAGAIN) terom@89: return false; terom@89: terom@89: else terom@89: throw NetworkSocketOSError(socket, "recv"); terom@89: } terom@89: terom@89: // handle EOF terom@89: if (ret == 0) terom@89: throw NetworkSocketEOFError(socket, "recv"); terom@89: terom@89: assert(ret >= 0); terom@89: terom@89: // update offset terom@89: offset += ret; terom@89: terom@89: // did we get enough? terom@89: if ((unsigned int) ret < item_size) terom@89: return false; terom@89: else terom@89: return true; terom@89: } terom@89: terom@89: bool NetworkBuffer::peek_prefix (uint16_t &ref) { terom@89: if (offset < sizeof(uint16_t)) terom@89: return false; terom@89: terom@89: ref = ntohs(*((uint16_t *) (buf))); terom@89: terom@89: return true; terom@89: } terom@89: terom@89: bool NetworkBuffer::peek_prefix (uint32_t &ref) { terom@89: if (offset < sizeof(uint32_t)) terom@89: return false; terom@89: terom@89: ref = ntohl(*((uint32_t *) (buf))); terom@89: terom@89: return true; terom@89: } terom@89: terom@89: template PrefixType NetworkBuffer::read_prefix (char *buf_ptr, size_t buf_max) { terom@89: PrefixType prefix = 0; terom@89: size_t missing = 0; terom@89: terom@89: do { terom@89: // do we have the prefix? terom@89: if (peek_prefix(prefix)) { terom@89: // do we already have the payload? terom@89: if (offset >= sizeof(PrefixType) + prefix) { terom@89: break; terom@89: terom@89: } else { terom@89: missing = (sizeof(PrefixType) + prefix) - offset; terom@89: } terom@89: terom@89: } else { terom@89: missing = sizeof(PrefixType); terom@89: } terom@89: terom@89: // sanity-check terom@89: assert(missing); terom@89: terom@89: // try and read the missing data terom@89: if (try_read(missing) == false) { terom@89: // if unable to read what we need, return zero. terom@89: return 0; terom@89: } terom@89: terom@89: // assess the situation again terom@89: } while (true); terom@89: terom@89: // copy the data over, unless it's too large terom@89: if (prefix <= buf_max) { terom@89: // ...don't copy the prefix, though terom@89: memcpy(buf_ptr, buf + sizeof(PrefixType), prefix); terom@89: terom@89: // trim the bytes out terom@89: trim(sizeof(PrefixType) + prefix); terom@89: terom@89: // return terom@89: return prefix; terom@89: terom@89: } else { terom@89: // trim the bytes out terom@89: trim(sizeof(PrefixType) + prefix); terom@89: terom@89: throw NetworkBufferError("recv prefix overflow"); terom@89: } terom@89: } terom@89: terom@89: void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) { terom@89: int ret; terom@89: terom@89: // try and short-circuit writes unless we have already buffered data terom@89: if (offset == 0) { terom@89: try { terom@89: // attempt to send something terom@89: ret = socket.send(buf_ptr, buf_size); terom@89: terom@89: } catch (CL_Error &e) { terom@89: // ignore EAGAIN, detect this by setting ret to -1 terom@89: if (errno != EAGAIN) terom@89: throw NetworkSocketOSError(socket, "send"); terom@89: terom@89: ret = -1; terom@89: } terom@89: terom@89: // if we managed to send something, adjust buf/size and buffer terom@89: if (ret > 0) { terom@89: // sanity-check terom@89: assert(buf_size >= (unsigned int) ret); terom@89: terom@89: buf_ptr += ret; terom@89: buf_size -= ret; terom@89: terom@89: // if that was all, we're done terom@89: if (buf_size == 0) terom@89: return; terom@89: } terom@89: } terom@89: terom@89: // resize to fit buf_size more bytes terom@89: resize(buf_size); terom@89: terom@89: // copy into our internal buffer terom@89: memcpy(buf + offset, buf_ptr, buf_size); terom@89: } terom@89: terom@89: void NetworkBuffer::flush_write (void) { terom@89: int ret; terom@89: terom@89: // ignore if we don't have any data buffered terom@89: if (offset == 0) terom@89: return; terom@89: terom@89: // attempt to write as much as possible terom@89: try { terom@89: ret = socket.send(buf, offset); terom@89: terom@89: } catch (CL_Error &e) { terom@89: // ignore EAGAIN and just return terom@89: if (errno == EAGAIN) terom@89: return; terom@89: terom@89: else terom@89: throw NetworkSocketOSError(socket, "send"); terom@89: } terom@89: terom@89: // trim the buffer terom@89: trim(ret); terom@89: } terom@89: terom@89: void NetworkBuffer::write_prefix (char *buf, uint16_t prefix) { terom@89: uint16_t nval = htons(prefix); terom@89: terom@89: push_write((char*) &nval, sizeof(uint16_t)); terom@89: push_write(buf, prefix); terom@89: } terom@89: terom@89: void NetworkBuffer::write_prefix (char *buf, uint32_t prefix) { terom@89: uint32_t nval = htonl(prefix); terom@89: terom@89: push_write((char*) &nval, sizeof(uint32_t)); terom@89: push_write(buf, prefix); terom@89: } terom@89: terom@89: NetworkTCPTransport::NetworkTCPTransport (NetworkSocket socket) : terom@89: socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) { terom@89: terom@89: // connect signals terom@89: slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read); terom@89: slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write); terom@89: slots.connect(socket.sig_disconnected(), this, &NetworkTCPTransport::on_disconnected); terom@89: } terom@89: terom@89: terom@89: void NetworkTCPTransport::on_read (void) { terom@89: uint16_t prefix; terom@89: NetworkPacket packet; terom@89: terom@89: // let the in stream read length-prefixed packets and pass them on to handle_packet terom@89: while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size())) > 0) { terom@89: packet.set_data_size(prefix); terom@89: _sig_packet(packet); terom@89: } terom@89: } terom@89: terom@89: void NetworkTCPTransport::on_write (void) { terom@89: // just flush the output buffer terom@89: out.flush_write(); terom@89: } terom@89: terom@89: void NetworkTCPTransport::on_disconnected (void) { terom@89: // pass right through terom@89: _sig_disconnect(); terom@89: } terom@89: terom@89: void NetworkTCPTransport::write_packet (const NetworkPacket &packet) { terom@89: uint16_t prefix = packet.get_data_size(); terom@89: terom@89: if (prefix != packet.get_data_size()) terom@89: throw CL_Error("send prefix overflow"); terom@89: terom@89: try { terom@89: // just write to the output buffer terom@89: out.write_prefix((char *) packet.get_buf(), prefix); terom@89: terom@89: } catch (Error &e) { terom@89: const char *err = e.what(); terom@89: terom@89: Engine::log(ERROR, "tcp.write_packet") << err; terom@89: terom@89: throw; terom@89: } terom@89: } terom@89: terom@89: NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : terom@89: socket(CL_Socket::tcp, CL_Socket::ipv4) { terom@89: terom@89: // bind terom@89: socket.bind(listen_addr); terom@89: terom@89: // assign slots terom@89: slots.connect(socket.sig_read_triggered(), this, &NetworkTCPServer::on_accept); terom@89: terom@89: // listen terom@89: socket.listen(NETWORK_LISTEN_BACKLOG); terom@89: terom@89: // use nonblocking sockets terom@89: socket.set_nonblocking(true); terom@89: } terom@89: terom@89: terom@89: void NetworkTCPServer::on_accept (void) { terom@89: // accept a new socket terom@89: NetworkSocket client_sock = socket.accept(); terom@89: terom@89: // create a new NetworkTCPTransport terom@89: NetworkTCPTransport *client = buildTransport(client_sock); terom@89: terom@89: // let our user handle it terom@89: _sig_client(client); terom@89: } terom@89: terom@89: NetworkTCPTransport* NetworkTCPServer::buildTransport (CL_Socket &socket) { terom@89: return new NetworkTCPTransport(socket); terom@89: } terom@89: terom@89: NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : terom@89: NetworkTCPTransport(NetworkSocket(CL_Socket::tcp, CL_Socket::ipv4)) { terom@89: terom@89: // connect terom@89: socket.connect(connect_addr); terom@89: terom@89: // use nonblocking sockets terom@89: socket.set_nonblocking(true); terom@89: }