src/transport_fd.c
author Tero Marttila <terom@fixme.fi>
Wed, 27 May 2009 23:57:48 +0300
branchnew-lib-errors
changeset 217 7728d6ec3abf
parent 182 471ca1e744da
permissions -rw-r--r--
nexus.c compiles
#include "transport_fd.h"

#include "log.h"

#include <fcntl.h>
#include <unistd.h>
#include <assert.h>

/**
 * Our libevent callback
 */
static void transport_fd_on_event (evutil_socket_t _fd, short ev_what, void *arg) 
{
    struct transport_fd *fd = arg;

    (void) _fd;

    short what = 0;

    // build flags
    if (ev_what & EV_READ)
        what |= TRANSPORT_READ;

    if (ev_what & EV_WRITE)
        what |= TRANSPORT_WRITE;

    // invoke user callback
    fd->cb_func(fd, what, fd->cb_arg);
}

/**
 * Our transport_methods implementations
 */
err_t transport_fd__read (transport_t *transport, void *buf, size_t *len, error_t *err)
{
    struct transport_fd *fd = transport_check(transport, &transport_fd_type);
    int ret;

    RESET_ERROR(err);
    
    // read(), and detect non-EAGAIN or EOF
    if ((ret = read(fd->fd, buf, *len)) < 0 && errno != EAGAIN)
        // unexpected error
        RETURN_SET_ERROR_ERRNO(err, ERR_READ);
    
    else if (ret == 0)
        // EOF
        return SET_ERROR(err, ERR_EOF);


    if (ret < 0) {
        // EAGAIN -> zero bytes
        *len = 0;

    } else {
        // normal -> bytes read
        *len = ret;
    }

    // ok
    return SUCCESS;
}

err_t transport_fd__write (transport_t *transport, const void *buf, size_t *len, struct error_info *err)
{
    struct transport_fd *fd = transport_check(transport, &transport_fd_type);
    int ret;
    
    RESET_ERROR(err);
    
    // write(), and detect non-EAGAIN or EOF
    if ((ret = write(fd->fd, buf, *len)) < 0 && errno != EAGAIN)
        // unexpected error
        RETURN_SET_ERROR_ERRNO(err, ERR_WRITE);
    
    else if (ret == 0)
        // EOF
        return SET_ERROR(err, ERR_WRITE_EOF);


    if (ret < 0) {
        // EAGAIN -> zero bytes
        *len = 0;

        if (transport->info.ev_mask & TRANSPORT_WRITE)
            // enable the write event
            if ((ERROR_CODE(err) = transport_fd_enable(fd, TRANSPORT_WRITE)))
                return ERROR_CODE(err);

    } else {
        // normal -> bytes read
        *len = ret;
    }

    return SUCCESS;
}

err_t transport_fd__events (transport_t *transport, short ev_mask, error_t *err)
{
    struct transport_fd *fd = transport_check(transport, &transport_fd_type);
    
    short mask = 0;

    // enable read as requested
    if (ev_mask & TRANSPORT_READ)
        mask |= TRANSPORT_READ;
    
    // enable write if requested and it's currently enabled
    if ((ev_mask & TRANSPORT_WRITE) && event_pending(fd->ev_write, EV_WRITE, NULL))
        mask |= TRANSPORT_WRITE;

    // set
    return (ERROR_CODE(err) = transport_fd_events(fd, mask));
}

void transport_fd__deinit (transport_t *transport)
{
    struct transport_fd *fd = transport_check(transport, &transport_fd_type);

    transport_fd_deinit(fd);
}

const struct transport_type transport_fd_type = {
    .base_type  = {
        .parent     = &transport_type_type,
    },
    .methods    = {
        .read       = transport_fd__read,
        .write      = transport_fd__write,
        .events     = transport_fd__events,
        .deinit     = transport_fd__deinit
    }
};

/**
 * Dummy callbacks
 */
void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg)
{
    (void) arg;
    
    // proxy
    transport_invoke(TRANSPORT_FD_BASE(fd), what);
}

/**
 * Function implementations
 */
void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd)
{
    // sanity-check
    assert(!fd->fd);
    assert(!fd->ev_read && !fd->ev_write);
    assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);

    // initialize
    fd->ev_base = ev_base;
    fd->fd = _fd;
    fd->cb_func = fd->cb_arg = NULL;
}

err_t transport_fd_nonblock (struct transport_fd *fd, bool nonblock)
{
    assert(fd->fd != TRANSPORT_FD_INVALID);

    // XXX: maintain old flags?


    // set new flags
    if (fcntl(fd->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
        return ERR_FCNTL;

    return SUCCESS;
}

/**
 * Install our internal event handler.
 *
 * The events should not already be set up.
 *
 * Cleans up partial events on errors
 */
err_t transport_fd_install (struct transport_fd *fd)
{
    assert(fd->fd != TRANSPORT_FD_INVALID);
    assert(!fd->ev_read && !fd->ev_write);

    // create new events
    if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ | EV_PERSIST, transport_fd_on_event, fd)) == NULL)
        goto err_event_add;

    if ((fd->ev_write = event_new(fd->ev_base, fd->fd, EV_WRITE, transport_fd_on_event, fd)) == NULL)
        goto err_event_add;
    
    // ok
    return SUCCESS;

err_event_add:
    // remove partial events
    transport_fd_clear(fd);

    return ERR_EVENT_NEW;
}

err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg)
{
    // requires a valid fd
    assert(fd->fd != TRANSPORT_FD_INVALID);
    
    // store
    fd->cb_func = cb_func;
    fd->cb_arg = cb_arg;
    
    // install the event handlers?
    if (!fd->ev_read || !fd->ev_write)
        return transport_fd_install(fd);
    else
        return SUCCESS;
}

err_t transport_fd_enable (struct transport_fd *fd, short mask)
{
    // just add the appropriate events
    if (mask & TRANSPORT_READ && event_add(fd->ev_read, NULL))
        return ERR_EVENT_ADD;
 
    if (mask & TRANSPORT_WRITE && event_add(fd->ev_write, NULL))
        return ERR_EVENT_ADD;
    

    return SUCCESS;
}

err_t transport_fd_disable (struct transport_fd *fd, short mask)
{
    if (mask & TRANSPORT_READ && event_del(fd->ev_read))
        return ERR_EVENT_DEL;

    if (mask & TRANSPORT_WRITE && event_del(fd->ev_write))
        return ERR_EVENT_DEL;


    return SUCCESS;
}

err_t transport_fd_events (struct transport_fd *fd, short mask)
{
    err_t err;
    
    // enable/disable read
    if (mask & TRANSPORT_READ)
        err = event_add(fd->ev_read, NULL);
    else
        err = event_del(fd->ev_read);

    if (err)
        return err;

    // enable/disable write
    if (mask & TRANSPORT_WRITE)
        err = event_add(fd->ev_write, NULL);
    else
        err = event_del(fd->ev_write);

    if (err)
        return err;

    // ok
    return SUCCESS;
}

/**
 * Remove our current ev_* events, but leave the cb_* intact.
 */
static void transport_fd_remove (struct transport_fd *fd)
{
    if (fd->ev_read)
        event_free(fd->ev_read);

    if (fd->ev_write)
        event_free(fd->ev_write);
    
    fd->ev_read = NULL;
    fd->ev_write = NULL;
}

void transport_fd_clear (struct transport_fd *fd)
{
    // remove the events
    transport_fd_remove(fd);
    
    // clear the callbacks
    fd->cb_func = fd->cb_arg = NULL;
}

err_t transport_fd_defaults (struct transport_fd *fd)
{
    error_t err;

    // install the transport_invoke callback handler
    if ((ERROR_CODE(&err) = transport_fd_setup(fd, transport_fd_callback_user, NULL)))
        goto error;

    // enable read unless masked out
    if (TRANSPORT_FD_BASE(fd)->info.ev_mask & TRANSPORT_READ) {
        if ((ERROR_CODE(&err) = transport_fd_enable(fd, TRANSPORT_READ)))
            goto error;
    }
    
    // ok
    return SUCCESS;

error:
    return ERROR_CODE(&err);
}

err_t transport_fd_set (struct transport_fd *fd, int _fd)
{
    assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);

    // close the old stuff
    transport_fd_close(fd);
    
    // set the new one
    fd->fd = _fd;
    
    // do we have callbacks that we need to setup?
    if (fd->cb_func)
        return transport_fd_install(fd);
    else 
        return SUCCESS;
}

void transport_fd_invoke (struct transport_fd *fd, short what)
{
    // invoke
    transport_invoke(TRANSPORT_FD_BASE(fd), what);
}

err_t transport_fd_close (struct transport_fd *fd)
{
    int _fd = fd->fd;

    // remove any installed events
    transport_fd_remove(fd);

    // invalidate fd
    fd->fd = TRANSPORT_FD_INVALID;
 
    // close the fd
    if (_fd != TRANSPORT_FD_INVALID && close(_fd))
        return ERR_CLOSE;


    return SUCCESS;
}

void transport_fd_deinit (struct transport_fd *fd)
{
    err_t tmp;

    // XXX: this might block
    if ((tmp = transport_fd_close(fd)))
        log_warn_err(tmp, "close");

}