terom@185: terom@186: #include "TCP.hh" terom@186: #include "../Engine.hh" terom@185: terom@200: /* terom@200: * NetworkTCPTransport terom@200: */ terom@378: NetworkTCPTransport::NetworkTCPTransport (NetworkSocket *socket) : terom@185: socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) { terom@185: terom@185: // connect signals terom@378: slots.connect(socket->sig_read(), this, &NetworkTCPTransport::on_read); terom@378: slots.connect(socket->sig_write(), this, &NetworkTCPTransport::on_write); terom@380: terom@380: // activate read polling, but leave write polling for later terom@380: socket->set_poll_read(true); terom@185: } terom@378: terom@378: NetworkTCPTransport::~NetworkTCPTransport (void) { terom@378: // release socket terom@378: delete socket; terom@378: } terom@185: terom@185: void NetworkTCPTransport::on_read (void) { terom@203: uint32_t length; terom@200: char *buf_ptr; terom@185: terom@227: try { terom@227: // let the in stream read length-prefixed packets and pass them on to handle_packet terom@227: do { terom@227: // not enough data -> return and wait terom@227: if (in.peek_data(length, buf_ptr) == false) terom@227: break; terom@200: terom@227: // allocate the NetworkPacketBuffer with the given buf_ptr/length terom@227: NetworkPacketBuffer packet(buf_ptr, length, length); terom@227: terom@227: // pass the packet on terom@227: _sig_packet(packet); terom@227: terom@227: // flush it terom@227: in.flush_data(); terom@227: } while (true); terom@227: terom@227: } catch (NetworkSocketError &e) { terom@227: // log and disconnect terom@227: Engine::log(ERROR, "tcp.on_read") << "socket error: " << e.what(); terom@227: terom@380: handle_disconnect(); terom@185: } terom@185: } terom@185: terom@185: void NetworkTCPTransport::on_write (void) { terom@227: try { terom@380: // just flush the output buffer, and deactivate output polling if done terom@380: if (!out.flush_write()) terom@380: socket->set_poll_write(false); terom@227: terom@227: } catch (NetworkSocketError &e) { terom@227: // log and disconnect terom@227: Engine::log(ERROR, "tcp.on_write") << "socket error: " << e.what(); terom@227: terom@380: handle_disconnect(); terom@227: } terom@185: } terom@185: terom@200: void NetworkTCPTransport::write_packet (const NetworkPacketBuffer &packet) { terom@203: uint32_t prefix = packet.get_data_size(); terom@185: terom@185: if (prefix != packet.get_data_size()) terom@185: throw CL_Error("send prefix overflow"); terom@185: terom@185: try { terom@380: // just write to the output buffer, but activate output polling if needed terom@380: if (out.write_prefix((char *) packet.get_buf(), prefix)) terom@380: socket->set_poll_write(true); terom@185: terom@227: } catch (NetworkSocketError &e) { terom@185: const char *err = e.what(); terom@185: terom@185: Engine::log(ERROR, "tcp.write_packet") << err; terom@185: terom@263: // XXX: these are not handled anywhere :( terom@185: throw; terom@185: } terom@185: } terom@380: terom@380: void NetworkTCPTransport::handle_disconnect (void) { terom@380: // disable events on our socket terom@380: socket->set_poll_read(false); terom@380: socket->set_poll_write(false); terom@380: terom@380: // signal terom@380: _sig_disconnect(); terom@380: } terom@185: terom@378: /* terom@378: * NetworkTCPServer terom@378: */ terom@185: NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : terom@378: socket(AF_UNSPEC, SOCK_STREAM) { terom@246: terom@185: // bind terom@185: socket.bind(listen_addr); terom@185: terom@185: // assign slots terom@378: slots.connect(socket.sig_read(), this, &NetworkTCPServer::on_accept); terom@185: terom@185: // listen terom@185: socket.listen(NETWORK_LISTEN_BACKLOG); terom@185: terom@185: // use nonblocking sockets terom@185: socket.set_nonblocking(true); terom@380: terom@380: // activate polling terom@380: socket.set_poll_read(true); terom@185: } terom@185: terom@185: void NetworkTCPServer::on_accept (void) { terom@185: // accept a new socket terom@378: NetworkSocket *client_sock = socket.accept(NULL); terom@185: terom@185: // create a new NetworkTCPTransport terom@185: NetworkTCPTransport *client = buildTransport(client_sock); terom@185: terom@185: // let our user handle it terom@185: _sig_client(client); terom@185: } terom@185: terom@378: NetworkTCPTransport* NetworkTCPServer::buildTransport (NetworkSocket *socket) { terom@185: return new NetworkTCPTransport(socket); terom@185: } terom@185: terom@378: /* terom@378: * NetworkTCPClient terom@378: */ terom@185: NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : terom@378: NetworkTCPTransport(new NetworkSocket(AF_UNSPEC, SOCK_STREAM)) { terom@246: terom@185: // connect terom@378: socket->connect(connect_addr); terom@185: terom@185: // use nonblocking sockets terom@378: socket->set_nonblocking(true); terom@185: }