src/sock_tcp.c
author Tero Marttila <terom@fixme.fi>
Wed, 01 Apr 2009 19:31:11 +0300
changeset 109 bfe9b9a8fe5b
parent 85 75bc8b164ef8
child 117 9cb405164250
permissions -rw-r--r--
fix some more valgrind errors

#include "sock_tcp.h"
#include "log.h"

#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>

/*
 * Our basic socket event handler for driving our callbacks
 */
static void sock_tcp_event_handler (evutil_socket_t fd, short what, void *arg) 
{
    struct sock_tcp *sock = arg;

    (void) fd;

    // invoke appropriate callback
    sock_stream_invoke_callbacks(SOCK_TCP_BASE(sock), what);
}

/*
 * Our sock_stream_methods.read method
 */
static err_t sock_tcp_read (struct sock_stream *base_sock, void *buf, size_t *len)
{
    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
    int ret;
    
    // read(), and detect non-EAGAIN or EOF
    if ((ret = read(sock->fd, buf, *len)) < 0 && errno != EAGAIN)
        // unexpected error
        RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_READ);
    
    else if (ret == 0)
        // EOF
        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_READ_EOF);


    if (ret < 0) {
        // EAGAIN -> zero bytes
        *len = 0;

    } else {
        // normal -> bytes read
        *len = ret;
    }

    // ok
    return SUCCESS;
}

/*
 * Our sock_stream_methods.write method
 */
static err_t sock_tcp_write (struct sock_stream *base_sock, const void *buf, size_t *len)
{
    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
    int ret;
    
    // write(), and detect non-EAGAIN or EOF
    if ((ret = write(sock->fd, buf, *len)) < 0 && errno != EAGAIN)
        // unexpected error
        RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_WRITE);
    
    else if (ret == 0)
        // EOF
        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_WRITE_EOF);


    if (ret < 0) {
        // EAGAIN -> zero bytes
        *len = 0;

    } else {
        // normal -> bytes read
        *len = ret;
    }

    return SUCCESS;
}

static err_t sock_tcp_event_init (struct sock_stream *base_sock)
{
    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
    err_t err;

    // set nonblocking
    if ((err = sock_tcp_set_nonblock(sock, 1)))
        return err;

    // add ourselves as the event handler
    if ((err = sock_tcp_init_ev(sock, &sock_tcp_event_handler, sock)))
        return err;
    
    // done
    return SUCCESS;
}

static err_t sock_tcp_event_enable (struct sock_stream *base_sock, short mask)
{
    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
    
    // implemented in sock_tcp_add_event
    return sock_tcp_add_event(sock, mask);
}

static void sock_tcp_release (struct sock_stream *base_sock)
{
    struct sock_tcp *sock = SOCK_FROM_BASE(base_sock, struct sock_tcp);
    
    // close and free
    sock_tcp_close(sock);
    sock_tcp_free(sock);
}

/*
 * Our sock_stream_type
 */
static struct sock_stream_type sock_tcp_type = {
    .methods                = {
        .read               = &sock_tcp_read,
        .write              = &sock_tcp_write,
        .event_init         = &sock_tcp_event_init,
        .event_enable       = &sock_tcp_event_enable,
        .release            = &sock_tcp_release,
    },
};

err_t sock_tcp_alloc (struct sock_tcp **sock_ptr)
{
    // alloc
    if ((*sock_ptr = calloc(1, sizeof(**sock_ptr))) == NULL)
        return ERR_CALLOC;
    
    // initialize base with sock_tcp_type
    sock_stream_init(SOCK_TCP_BASE(*sock_ptr), &sock_tcp_type);

    // invalid fds are <0
    (*sock_ptr)->fd = -1;

    // done
    return SUCCESS;
}

err_t sock_tcp_init_fd (struct sock_tcp *sock, int fd)
{
    // valid fd
    assert(fd >= 0);

    // initialize
    sock->fd = fd;

    // done
    return SUCCESS;
}

err_t sock_tcp_init_ev (struct sock_tcp *sock, void (*ev_cb)(evutil_socket_t, short, void *), void *cb_arg)
{
    // require valid fd
    assert(sock->fd >= 0);

    // this is initialization
    assert(sock->ev_read == NULL && sock->ev_write == NULL);
    
    // create new event
    if ((sock->ev_read = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_READ, ev_cb, cb_arg)) == NULL)
        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW);

    if ((sock->ev_write = event_new(_sock_stream_ctx.ev_base, sock->fd, EV_WRITE, ev_cb, cb_arg)) == NULL)
        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_NEW);

    // ok
    return SUCCESS;
}

void sock_tcp_deinit_ev (struct sock_tcp *sock)
{
    if (sock->ev_read) {
        event_free(sock->ev_read);

        sock->ev_read = NULL;
    }

    if (sock->ev_write) {
        event_free(sock->ev_write);
        
        sock->ev_write = NULL;
    }
}

err_t sock_tcp_init_socket (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
{
    // must not be set already
    assert(sock->fd < 0);

    // call socket
    if ((sock->fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol)) < 0)
        RETURN_SET_ERROR_ERRNO(err, ERR_SOCKET);

    // ok
    return SUCCESS;
}

err_t sock_tcp_add_event (struct sock_tcp *sock, short mask)
{
    // just add the appropraite events
    if (mask & EV_READ && event_add(sock->ev_read, NULL))
        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
 
    if (mask & EV_WRITE && event_add(sock->ev_write, NULL))
        return SET_ERROR(SOCK_TCP_ERR(sock), ERR_EVENT_ADD);
    
    // done
    return SUCCESS;
}

/**
 * Attempt to connect to the given addrinfo, or the next one, if that fails, etc.
 */
static err_t sock_tcp_connect_async_continue (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
{
    // no more addresses left?
    if (!addr)
        // XXX: rename error
        return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);

    // try and connect to each one until we find one that works
    do {
        // attempt to start connect
        if (sock_tcp_connect_async_addr(sock, addr, err) == SUCCESS)
            break;

        // try the next one
        log_warn("sock_tcp_connect_async_addr(%s): %s", addr->ai_canonname, error_msg(err));

    } while ((addr = addr->ai_next));
    

    if (addr) {
        // we succesfully did a sock_tcp_connect_async_addr on valid address
        return SUCCESS;

    } else {
        // all of the connect_async_addr's failed, return the last error
        return ERROR_CODE(err);
    }
}

/**
 * Our async connect operation has completed, clean up addrinfos and events, and call the user callback. The given
 * \a err should be NULL for successful completion, or the error for unsuccesfully completion.
 */
static void sock_tcp_connect_async_done (struct sock_tcp *sock, struct error_info *err)
{
    // free the addrinfo
    freeaddrinfo(sock->async_res); 
    sock->async_res = sock->async_cur = NULL;

    // remove our event handler so the user can install their own
    sock_tcp_deinit_ev(sock);

    // ok, run callback
    SOCK_TCP_BASE(sock)->conn_cb_func(SOCK_TCP_BASE(sock), err, SOCK_TCP_BASE(sock)->conn_cb_arg);
}

/**
 * Our start_connect callback
 */
static void sock_tcp_connect_cb (int fd, short what, void *arg)
{
    struct sock_tcp *sock = arg;
    int optval;
    socklen_t optlen;
    struct error_info err;
    err_t tmp;

    // XXX: timeouts
    (void) what;

    // init params
    optval = 0;
    optlen = sizeof(optval);

    // read error code
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &optval, &optlen))
        JUMP_SET_ERROR_ERRNO(&err, ERR_GETSOCKOPT);
    
    // sanity-check optlen... not sure if this is sensible
    if (optlen != sizeof(optval))
        JUMP_SET_ERROR_EXTRA(&err, ERR_GETSOCKOPT, EINVAL);
        
    // did the connect complete succesfully or not?
    if (optval)
        JUMP_SET_ERROR_EXTRA(&err, ERR_CONNECT, optval);
    
    // done
    return sock_tcp_connect_async_done(sock, NULL);

error:
    // close the socket
    if ((tmp = sock_tcp_close(sock)))
        log_warn("error closing socket after connect error: %s", error_name(tmp));

    // log a warning
    log_warn("connect to '%s' failed: %s", sock->async_cur->ai_canonname, error_msg(&err));

    // try the next one or fail completely
    if (sock_tcp_connect_async_continue(sock, sock->async_cur->ai_next, &err))
        sock_tcp_connect_async_done(sock, &err);
}

err_t sock_tcp_connect_async_addr (struct sock_tcp *sock, struct addrinfo *addr, struct error_info *err)
{
    int ret;
    err_t tmp;

    // first, create the socket
    if (sock_tcp_init_socket(sock, addr, err))
        return ERROR_CODE(err);

    // then, set it up as nonblocking
    if ((ERROR_CODE(err) = sock_tcp_set_nonblock(sock, true)))
        goto error;

    // then, initiate the connect
    if ((ret = connect(sock->fd, addr->ai_addr, addr->ai_addrlen)) < 0 && errno != EINPROGRESS) 
        JUMP_SET_ERROR_ERRNO(err, ERR_CONNECT);
    
    if (ret < 0) {
        // ok, connect started, setup our completion callback
        if ((ERROR_CODE(err) = sock_tcp_init_ev(sock, &sock_tcp_connect_cb, sock)))
            goto error;
    
        // enable for write
        if ((ERROR_CODE(err) = sock_tcp_add_event(sock, EV_WRITE)))
            goto error;

        // set the "current" address in case it fails and we need to try the next one
        sock->async_cur = addr;

    } else {
        // oops... blocking connect - fail to avoid confusion
        // XXX: come up with a better error name to use
        JUMP_SET_ERROR_EXTRA(err, ERR_CONNECT, EINPROGRESS);
    }
    
    // ok
    return SUCCESS;

error:
    // close the stuff we did open
    if ((tmp = sock_tcp_close(sock)))
        log_warn("error closing socket after connect error: %s", error_name(tmp));

    return ERROR_CODE(err);
}

err_t sock_tcp_connect_async_begin (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err)
{
    struct addrinfo hints;
    int ret;

    // hints
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;

    // resolve
    if ((ret = getaddrinfo(hostname, service, &hints, &sock->async_res)))
        RETURN_SET_ERROR_EXTRA(err, ERR_GETADDRINFO, ret);
    
    // start connecting
    return sock_tcp_connect_async_continue(sock, sock->async_res, err);
}

err_t sock_tcp_connect_blocking (struct sock_tcp *sock, const char *hostname, const char *service, struct error_info *err)
{
    struct addrinfo hints, *res, *r;
    int ret;
    
    // zero error code
    RESET_ERROR(err);
    
    // hints
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;

    // resolve
    if ((ret = getaddrinfo(hostname, service, &hints, &res)))
        RETURN_SET_ERROR_EXTRA(err, ERR_GETADDRINFO, ret);

    // try each result in turn
    for (r = res; r; r = r->ai_next) {
        // create the socket
        if ((sock->fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) < 0) {
            // remember error
            SET_ERROR_ERRNO(err, ERR_SOCKET);

            // skip to next one
            continue;
        }
        
        // connect to remote address
        if (connect(sock->fd, r->ai_addr, r->ai_addrlen)) {
            // remember error
            SET_ERROR_ERRNO(err, ERR_CONNECT);
            
            // close/invalidate socket
            close(sock->fd);
            sock->fd = -1;

            // skip to next one
            continue;
        }
        
        // valid socket, use this
        break;
    }
    
    // ensure we got some valid socket, else return last error code
    if (sock->fd < 0) {
        // did we hit some error?
        if (IS_ERROR(err))
            // return last error
            return ERROR_CODE(err);
        
        else
            // no results
            return SET_ERROR(err, ERR_GETADDRINFO_EMPTY);
    }
    
    // ok, done
    return 0;    
}

err_t sock_tcp_set_nonblock (struct sock_tcp *sock, bool nonblock)
{
    // fcntl it
    // XXX: maintain old flags?
    if (fcntl(sock->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
        RETURN_SET_ERROR_ERRNO(SOCK_TCP_ERR(sock), ERR_FCNTL);

    // ok
    return SUCCESS;
}

err_t sock_tcp_close (struct sock_tcp *sock)
{
    struct error_info *err = SOCK_TCP_ERR(sock);
    
    // no errors yet
    RESET_ERROR(err);

    // must be connected
    assert(sock->fd >= 0);

    // kill any events
    sock_tcp_deinit_ev(sock);

    // close the socket itself
    if (close(sock->fd))
        SET_ERROR_ERRNO(err, ERR_CLOSE);

    // invalidate
    sock->fd = -1;

    return ERROR_CODE(err);
}

void sock_tcp_free (struct sock_tcp *sock)
{
    // must not be connected
    assert(sock->fd < 0);

    // free
    free(sock);
}

err_t sock_tcp_connect (struct sock_stream **sock_ptr, const char *host, const char *service, struct error_info *err)
{
    struct sock_tcp *sock;
    
    // allocate
    if ((ERROR_CODE(err) = sock_tcp_alloc(&sock)))
        return ERROR_CODE(err);
    
    // connect    
    if (sock_tcp_connect_blocking(sock, host, service, err))
        goto error;

    // good
    *sock_ptr = SOCK_TCP_BASE(sock);

    return 0;

error:
    // cleanup
    sock_tcp_free(sock);
        
    // return error code
    return ERROR_CODE(err);
}

err_t sock_tcp_connect_async (struct sock_stream **sock_ptr, const char *host, const char *service, 
        sock_stream_connect_cb cb_func, void *cb_arg, struct error_info *err)
{
    struct sock_tcp *sock;
    
    // allocate
    if ((ERROR_CODE(err) = sock_tcp_alloc(&sock)))
        return ERROR_CODE(err);

    // store the callbacks
    SOCK_TCP_BASE(sock)->conn_cb_func = cb_func;
    SOCK_TCP_BASE(sock)->conn_cb_arg = cb_arg;
    
    // connect    
    if (sock_tcp_connect_async_begin(sock, host, service, err))
        goto error;

    // good
    *sock_ptr = SOCK_TCP_BASE(sock);

    return 0;

error:
    // cleanup
    sock_tcp_free(sock);
        
    // return error code
    return ERROR_CODE(err);
}