#include "TCP.hh"
#include "../Engine.hh"
/*
* NetworkTCPTransport
*/
NetworkTCPTransport::NetworkTCPTransport (NetworkSocket *socket) :
socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) {
// connect signals
slots.connect(socket->sig_read(), this, &NetworkTCPTransport::on_read);
slots.connect(socket->sig_write(), this, &NetworkTCPTransport::on_write);
// activate read polling, but leave write polling for later
socket->set_poll_read(true);
}
NetworkTCPTransport::~NetworkTCPTransport (void) {
// release socket
delete socket;
}
void NetworkTCPTransport::on_read (void) {
uint32_t length;
char *buf_ptr;
try {
// let the in stream read length-prefixed packets and pass them on to handle_packet
do {
// not enough data -> return and wait
if (in.peek_data<uint32_t>(length, buf_ptr) == false)
break;
// allocate the NetworkPacketBuffer with the given buf_ptr/length
NetworkPacketBuffer packet(buf_ptr, length, length);
// pass the packet on
_sig_packet(packet);
// flush it
in.flush_data<uint32_t>();
} while (true);
} catch (NetworkSocketError &e) {
// log and disconnect
Engine::log(ERROR, "tcp.on_read") << "socket error: " << e.what();
handle_disconnect();
}
}
void NetworkTCPTransport::on_write (void) {
try {
// just flush the output buffer, and deactivate output polling if done
if (!out.flush_write())
socket->set_poll_write(false);
} catch (NetworkSocketError &e) {
// log and disconnect
Engine::log(ERROR, "tcp.on_write") << "socket error: " << e.what();
handle_disconnect();
}
}
void NetworkTCPTransport::write_packet (const NetworkPacketBuffer &packet) {
uint32_t prefix = packet.get_data_size();
if (prefix != packet.get_data_size())
throw CL_Error("send prefix overflow");
try {
// just write to the output buffer, but activate output polling if needed
if (out.write_prefix((char *) packet.get_buf(), prefix))
socket->set_poll_write(true);
} catch (NetworkSocketError &e) {
const char *err = e.what();
Engine::log(ERROR, "tcp.write_packet") << err;
// XXX: these are not handled anywhere :(
throw;
}
}
void NetworkTCPTransport::handle_disconnect (void) {
// disable events on our socket
socket->set_poll_read(false);
socket->set_poll_write(false);
// signal
_sig_disconnect();
}
/*
* NetworkTCPServer
*/
NetworkTCPServer::NetworkTCPServer (const NetworkEndpoint &listen_addr) :
socket(AF_UNSPEC, SOCK_STREAM) {
// bind
socket.bind(listen_addr);
// assign slots
slots.connect(socket.sig_read(), this, &NetworkTCPServer::on_accept);
// listen
socket.listen(NETWORK_LISTEN_BACKLOG);
// use nonblocking sockets
socket.set_nonblocking(true);
// activate polling
socket.set_poll_read(true);
}
void NetworkTCPServer::on_accept (void) {
// accept a new socket
NetworkSocket *client_sock = socket.accept(NULL);
// make it nonblocking
client_sock->set_nonblocking(true);
// create a new NetworkTCPTransport
NetworkTCPTransport *client = buildTransport(client_sock);
// let our user handle it
_sig_client(client);
}
NetworkTCPTransport* NetworkTCPServer::buildTransport (NetworkSocket *socket) {
return new NetworkTCPTransport(socket);
}
/*
* NetworkTCPClient
*/
NetworkTCPClient::NetworkTCPClient (const NetworkEndpoint &connect_addr) :
NetworkTCPTransport(new NetworkSocket(AF_UNSPEC, SOCK_STREAM)) {
// connect
socket->connect(connect_addr);
// use nonblocking sockets
socket->set_nonblocking(true);
}