#include "sock_tcp.h"
#include "log.h"
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
/*
* Our basic socket event handler for driving our callbacks
*/
static void sock_tcp_event_handler (evutil_socket_t fd, short what, void *arg)
{
struct sock_tcp *sock = arg;
(void) fd;
// invoke appropriate callback
sock_stream_invoke_callbacks(SOCK_TCP_BASE(sock), what);
}
/*
* Our sock_stream_methods.read method
*/
static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t *len)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
int ret;
// read(), and detect non-EAGAIN or EOF
if ((ret = read(sock->fd, buf, *len)) < 0 && errno != EAGAIN)
// unexpected error
RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_READ);
else if (ret == 0)
// EOF
return SET_ERROR(SOCK_TCP_ERR(sock), ERR_READ_EOF);
if (ret < 0) {
// EAGAIN -> zero bytes
*len = 0;
} else {
// normal -> bytes read
*len = ret;
}
// ok
return SUCCESS;
}
/*
* Our sock_stream_methods.write method
*/
static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t *len)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
int ret;
// write(), and detect non-EAGAIN or EOF
if ((ret = write(sock->fd, buf, *len)) < 0 && errno != EAGAIN)
// unexpected error
RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_WRITE);
else if (ret == 0)
// EOF
return SET_ERROR(SOCK_TCP_ERR(sock), ERR_WRITE_EOF);
if (ret < 0) {
// EAGAIN -> zero bytes
*len = 0;
} else {
// normal -> bytes read
*len = ret;
}
return SUCCESS;
}
static err_t sock_tcp_event_init (struct sock_stream *base_sock)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
err_t err;
// set nonblocking
if ((err = sock_tcp_set_nonblock(sock, 1)))
return err;
// add ourselves as the event handler
if ((err = sock_tcp_init_ev(sock, &sock_tcp_event_handler, sock)))
return err;
// done
return SUCCESS;
}
static err_t sock_tcp_event_enable (struct sock_stream *base_sock, short mask)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
// implemented in sock_tcp_add_event
return sock_tcp_add_event(sock, mask);
}
static void sock_tcp_release (struct sock_stream *base_sock)
{
struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
// close and free
sock_tcp_close(sock);
sock_tcp_free(sock);
}
/*
* Our sock_stream_type
*/
static struct sock_stream_type sock_tcp_type = {
.methods = {
.read = &sock_tcp_read,
.write = &sock_tcp_write,
.event_init = &sock_tcp_event_init,
.event_enable = &sock_tcp_event_enable,
.release = &sock_tcp_release,
},
};
err_t sock_tcp_alloc (struct sock_tcp **sock_ptr)
{
// alloc
if ((*sock_ptr = calloc(1, sizeof(**sock_ptr))) == NULL)
return ERR_CALLOC;
// initialize base with sock_tcp_type
sock_stream_init(SOCK_TCP_BASE(*sock_ptr), &sock_tcp_type);
// invalid fds are <0
(*sock_ptr)->fd = -1;
// done
return SUCCESS;
}
err_t sock_tcp_init_fd (struct sock_tcp *sock, int fd)
{
// valid fd
assert(fd >= 0);
// initialize
sock->fd = fd;
// done
return SUCCESS;
}
err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb)(evutil_socket_t, short, void *), void *cb_arg)
{
// require valid fd
assert(sock->fd >= 0);
// this is initialization
assert(sock->ev_read == NULL && sock->ev_write == NULL);
// create new event
if ((sock->ev_read = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_READ, ev_cb, cb_arg)) == NULL)
return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW);
if ((sock->ev_write = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_WRITE, ev_cb, cb_arg)) == NULL)
return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW);
// ok
return SUCCESS;
}
void sock_tcp_deinit_ev (struct sock_tcp *sock)
{
if (sock->ev_read) {
event_free(sock->ev_read);
sock->ev_read = NULL;
}
if (sock->ev_write) {
event_free(sock->ev_write);
sock->ev_write = NULL;
}
}
err_t sock_tcp_init_socket (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
{
// must not be set already
assert(sock->fd < 0);
// call socket
if ((sock->fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol)) < 0)
RETURN_SET_ERROR_ERRNO(err, ERR_SOCKET);
// ok
return SUCCESS;
}
err_t sock_tcp_add_event (struct sock_tcp *sock, short mask)
{
// just add the appropraite events
if (mask & EV_READ && event_add(sock->ev_read, NULL))
return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
if (mask & EV_WRITE && event_add(sock->ev_write, NULL))
return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
// done
return SUCCESS;
}
/**
* Attempt to connect to the given addrinfo, or the next one, if that fails, etc.
*/
static err_t sock_tcp_connect_async_continue (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
{
// no more addresses left?
if (!addr)
// XXX: rename error
return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);
// try and connect to each one until we find one that works
do {
// attempt to start connect
if (sock_tcp_connect_async_addr(sock, addr, err) == SUCCESS)
break;
// try the next one
log_warn("sock_tcp_connect_async_addr(%s): %s", addr->ai_canonname, error_msg(err));
} while ((addr = addr->ai_next));
if (addr) {
// we succesfully did a sock_tcp_connect_async_addr on valid address
return SUCCESS;
} else {
// all of the connect_async_addr's failed, return the last error
return ERROR_CODE(err);
}
}
/**
* Our async connect operation has completed, clean up addrinfos and events, and call the user callback. The given
* \a err should be NULL for successful completion, or the error for unsuccesfully completion.
*/
static void sock_tcp_connect_async_done (struct sock_tcp *sock, struct error_info *err)
{
// free the addrinfo
freeaddrinfo(sock->async_res);
sock->async_res = sock->async_cur = NULL;
// remove our event handler so the user can install their own
sock_tcp_deinit_ev(sock);
// ok, run callback
SOCK_TCP_BASE(sock)->conn_cb_func(SOCK_TCP_BASE(sock), err, SOCK_TCP_BASE(sock)->conn_cb_arg);
}
/**
* Our start_connect callback
*/
static void sock_tcp_connect_cb (int fd, short what, void *arg)
{
struct sock_tcp *sock = arg;
int optval;
socklen_t optlen;
struct error_info err;
err_t tmp;
// XXX: timeouts
(void) what;
// init params
optval = 0;
optlen = sizeof(optval);
// read error code
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen))
JUMP_SET_ERROR_ERRNO(&err, ERR_GETSOCKOPT);
// sanity-check optlen... not sure if this is sensible
if (optlen != sizeof(optval))
JUMP_SET_ERROR_EXTRA(&err, ERR_GETSOCKOPT, EINVAL);
// did the connect complete succesfully or not?
if (optval)
JUMP_SET_ERROR_EXTRA(&err, ERR_CONNECT, optval);
// done
return sock_tcp_connect_async_done(sock, NULL);
error:
// close the socket
if ((tmp = sock_tcp_close(sock)))
log_warn("error closing socket after connect error: %s", error_name(tmp));
// log a warning
log_warn("connect to '%s' failed: %s", sock->async_cur->ai_canonname, error_msg(&err));
// try the next one or fail completely
if (sock_tcp_connect_async_continue(sock, sock->async_cur->ai_next, &err))
sock_tcp_connect_async_done(sock, &err);
}
err_t sock_tcp_connect_async_addr (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
{
int ret;
err_t tmp;
// first, create the socket
if (sock_tcp_init_socket(sock, addr, err))
return ERROR_CODE(err);
// then, set it up as nonblocking
if ((ERROR_CODE(err) = sock_tcp_set_nonblock(sock, true)))
goto error;
// then, initiate the connect
if ((ret = connect(sock->fd, addr->ai_addr, addr->ai_addrlen)) < 0 && errno != EINPROGRESS)
JUMP_SET_ERROR_ERRNO(err, ERR_CONNECT);
if (ret < 0) {
// ok, connect started, setup our completion callback
if ((ERROR_CODE(err) = sock_tcp_init_ev(sock, &sock_tcp_connect_cb, sock)))
goto error;
// enable for write
if ((ERROR_CODE(err) = sock_tcp_add_event(sock, EV_WRITE)))
goto error;
// set the "current" address in case it fails and we need to try the next one
sock->async_cur = addr;
} else {
// oops... blocking connect - fail to avoid confusion
// XXX: come up with a better error name to use
JUMP_SET_ERROR_EXTRA(err, ERR_CONNECT, EINPROGRESS);
}
// ok
return SUCCESS;
error:
// close the stuff we did open
if ((tmp = sock_tcp_close(sock)))
log_warn("error closing socket after connect error: %s", error_name(tmp));
return ERROR_CODE(err);
}
err_t sock_tcp_connect_async_begin (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err)
{
struct addrinfo hints;
int ret;
// hints
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
// resolve
if ((ret = getaddrinfo(hostname, service, &hints, &sock->async_res)))
RETURN_SET_ERROR_EXTRA(err, ERR_GETADDRINFO, ret);
// start connecting
return sock_tcp_connect_async_continue(sock, sock->async_res, err);
}
err_t sock_tcp_connect_blocking (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err)
{
struct addrinfo hints, *res, *r;
int ret;
// zero error code
RESET_ERROR(err);
// hints
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
// resolve
if ((ret = getaddrinfo(hostname, service, &hints, &res)))
RETURN_SET_ERROR_EXTRA(err, ERR_GETADDRINFO, ret);
// try each result in turn
for (r = res; r; r = r->ai_next) {
// create the socket
if ((sock->fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) < 0) {
// remember error
SET_ERROR_ERRNO(err, ERR_SOCKET);
// skip to next one
continue;
}
// connect to remote address
if (connect(sock->fd, r->ai_addr, r->ai_addrlen)) {
// remember error
SET_ERROR_ERRNO(err, ERR_CONNECT);
// close/invalidate socket
close(sock->fd);
sock->fd = -1;
// skip to next one
continue;
}
// valid socket, use this
break;
}
// ensure we got some valid socket, else return last error code
if (sock->fd < 0) {
// did we hit some error?
if (IS_ERROR(err))
// return last error
return ERROR_CODE(err);
else
// no results
return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);
}
// ok, done
return 0;
}
err_t sock_tcp_set_nonblock (struct sock_tcp *sock, bool nonblock)
{
// fcntl it
// XXX: maintain old flags?
if (fcntl(sock->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_FCNTL);
// ok
return SUCCESS;
}
err_t sock_tcp_close (struct sock_tcp *sock)
{
struct error_info *err = SOCK_TCP_ERR(sock);
// no errors yet
RESET_ERROR(err);
// must be connected
assert(sock->fd >= 0);
// kill any events
sock_tcp_deinit_ev(sock);
// close the socket itself
if (close(sock->fd))
SET_ERROR_ERRNO(err, ERR_CLOSE);
// invalidate
sock->fd = -1;
return ERROR_CODE(err);
}
void sock_tcp_free (struct sock_tcp *sock)
{
// must not be connected
assert(sock->fd < 0);
// free
free(sock);
}
err_t sock_tcp_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err)
{
struct sock_tcp *sock;
// allocate
if ((ERROR_CODE(err) = sock_tcp_alloc(&sock)))
return ERROR_CODE(err);
// connect
if (sock_tcp_connect_blocking(sock, host, service, err))
goto error;
// good
*sock_ptr = SOCK_TCP_BASE(sock);
return 0;
error:
// cleanup
sock_tcp_free(sock);
// return error code
return ERROR_CODE(err);
}
err_t sock_tcp_connect_async (struct sock_stream **sock_ptr, const char *host, const char *service,
sock_stream_connect_cb cb_func, void *cb_arg, struct error_info *err)
{
struct sock_tcp *sock;
// allocate
if ((ERROR_CODE(err) = sock_tcp_alloc(&sock)))
return ERROR_CODE(err);
// store the callbacks
SOCK_TCP_BASE(sock)->conn_cb_func = cb_func;
SOCK_TCP_BASE(sock)->conn_cb_arg = cb_arg;
// connect
if (sock_tcp_connect_async_begin(sock, host, service, err))
goto error;
// good
*sock_ptr = SOCK_TCP_BASE(sock);
return 0;
error:
// cleanup
sock_tcp_free(sock);
// return error code
return ERROR_CODE(err);
}