#include "transport_fd.h"
#include "log.h"
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
/**
* Our libevent callback
*/
static void transport_fd_on_event (evutil_socket_t _fd, short ev_what, void *arg)
{
struct transport_fd *fd = arg;
(void) _fd;
short what = 0;
// build flags
if (ev_what & EV_READ)
what |= TRANSPORT_READ;
if (ev_what & EV_WRITE)
what |= TRANSPORT_WRITE;
// invoke user callback
fd->cb_func(fd, what, fd->cb_arg);
}
/**
* Our transport_methods implementations
*/
err_t transport_fd__read (transport_t *transport, void *buf, size_t *len, error_t *err)
{
struct transport_fd *fd = transport_check(transport, &transport_fd_type);
int ret;
RESET_ERROR(err);
// read(), and detect non-EAGAIN or EOF
if ((ret = read(fd->fd, buf, *len)) < 0 && errno != EAGAIN)
// unexpected error
RETURN_SET_ERROR_ERRNO(err, ERR_READ);
else if (ret == 0)
// EOF
return SET_ERROR(err, ERR_EOF);
if (ret < 0) {
// EAGAIN -> zero bytes
*len = 0;
} else {
// normal -> bytes read
*len = ret;
}
// ok
return SUCCESS;
}
err_t transport_fd__write (transport_t *transport, const void *buf, size_t *len, struct error_info *err)
{
struct transport_fd *fd = transport_check(transport, &transport_fd_type);
int ret;
RESET_ERROR(err);
// write(), and detect non-EAGAIN or EOF
if ((ret = write(fd->fd, buf, *len)) < 0 && errno != EAGAIN)
// unexpected error
RETURN_SET_ERROR_ERRNO(err, ERR_WRITE);
else if (ret == 0)
// EOF
return SET_ERROR(err, ERR_WRITE_EOF);
if (ret < 0) {
// EAGAIN -> zero bytes
*len = 0;
if (transport->info.ev_mask & TRANSPORT_WRITE)
// enable the write event
if ((ERROR_CODE(err) = transport_fd_enable(fd, TRANSPORT_WRITE)))
return ERROR_CODE(err);
} else {
// normal -> bytes read
*len = ret;
}
return SUCCESS;
}
err_t transport_fd__events (transport_t *transport, short ev_mask, error_t *err)
{
struct transport_fd *fd = transport_check(transport, &transport_fd_type);
short mask = 0;
// enable read as requested
if (ev_mask & TRANSPORT_READ)
mask |= TRANSPORT_READ;
// enable write if requested and it's currently enabled
if ((ev_mask & TRANSPORT_WRITE) && event_pending(fd->ev_write, EV_WRITE, NULL))
mask |= TRANSPORT_WRITE;
// set
return (ERROR_CODE(err) = transport_fd_events(fd, mask));
}
void transport_fd__deinit (transport_t *transport)
{
struct transport_fd *fd = transport_check(transport, &transport_fd_type);
transport_fd_deinit(fd);
}
const struct transport_type transport_fd_type = {
.base_type = {
.parent = &transport_type_type,
},
.methods = {
.read = transport_fd__read,
.write = transport_fd__write,
.events = transport_fd__events,
.deinit = transport_fd__deinit
}
};
/**
* Dummy callbacks
*/
void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg)
{
(void) arg;
// proxy
transport_invoke(TRANSPORT_FD_BASE(fd), what);
}
/**
* Function implementations
*/
void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd)
{
// sanity-check
assert(!fd->fd);
assert(!fd->ev_read && !fd->ev_write);
assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
// initialize
fd->ev_base = ev_base;
fd->fd = _fd;
fd->cb_func = fd->cb_arg = NULL;
}
err_t transport_fd_nonblock (struct transport_fd *fd, bool nonblock)
{
assert(fd->fd != TRANSPORT_FD_INVALID);
// XXX: maintain old flags?
// set new flags
if (fcntl(fd->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
return ERR_FCNTL;
return SUCCESS;
}
/**
* Install our internal event handler.
*
* The events should not already be set up.
*
* Cleans up partial events on errors
*/
err_t transport_fd_install (struct transport_fd *fd)
{
assert(fd->fd != TRANSPORT_FD_INVALID);
assert(!fd->ev_read && !fd->ev_write);
// create new events
if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ | EV_PERSIST, transport_fd_on_event, fd)) == NULL)
goto err_event_add;
if ((fd->ev_write = event_new(fd->ev_base, fd->fd, EV_WRITE, transport_fd_on_event, fd)) == NULL)
goto err_event_add;
// ok
return SUCCESS;
err_event_add:
// remove partial events
transport_fd_clear(fd);
return ERR_EVENT_NEW;
}
err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg)
{
// requires a valid fd
assert(fd->fd != TRANSPORT_FD_INVALID);
// store
fd->cb_func = cb_func;
fd->cb_arg = cb_arg;
// install the event handlers?
if (!fd->ev_read || !fd->ev_write)
return transport_fd_install(fd);
else
return SUCCESS;
}
err_t transport_fd_enable (struct transport_fd *fd, short mask)
{
// just add the appropriate events
if (mask & TRANSPORT_READ && event_add(fd->ev_read, NULL))
return ERR_EVENT_ADD;
if (mask & TRANSPORT_WRITE && event_add(fd->ev_write, NULL))
return ERR_EVENT_ADD;
return SUCCESS;
}
err_t transport_fd_disable (struct transport_fd *fd, short mask)
{
if (mask & TRANSPORT_READ && event_del(fd->ev_read))
return ERR_EVENT_DEL;
if (mask & TRANSPORT_WRITE && event_del(fd->ev_write))
return ERR_EVENT_DEL;
return SUCCESS;
}
err_t transport_fd_events (struct transport_fd *fd, short mask)
{
err_t err;
// enable/disable read
if (mask & TRANSPORT_READ)
err = event_add(fd->ev_read, NULL);
else
err = event_del(fd->ev_read);
if (err)
return err;
// enable/disable write
if (mask & TRANSPORT_WRITE)
err = event_add(fd->ev_write, NULL);
else
err = event_del(fd->ev_write);
if (err)
return err;
// ok
return SUCCESS;
}
/**
* Remove our current ev_* events, but leave the cb_* intact.
*/
static void transport_fd_remove (struct transport_fd *fd)
{
if (fd->ev_read)
event_free(fd->ev_read);
if (fd->ev_write)
event_free(fd->ev_write);
fd->ev_read = NULL;
fd->ev_write = NULL;
}
void transport_fd_clear (struct transport_fd *fd)
{
// remove the events
transport_fd_remove(fd);
// clear the callbacks
fd->cb_func = fd->cb_arg = NULL;
}
err_t transport_fd_defaults (struct transport_fd *fd)
{
error_t err;
// install the transport_invoke callback handler
if ((ERROR_CODE(&err) = transport_fd_setup(fd, transport_fd_callback_user, NULL)))
goto error;
// enable read unless masked out
if (TRANSPORT_FD_BASE(fd)->info.ev_mask & TRANSPORT_READ) {
if ((ERROR_CODE(&err) = transport_fd_enable(fd, TRANSPORT_READ)))
goto error;
}
// ok
return SUCCESS;
error:
return ERROR_CODE(&err);
}
err_t transport_fd_set (struct transport_fd *fd, int _fd)
{
assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
// close the old stuff
transport_fd_close(fd);
// set the new one
fd->fd = _fd;
// do we have callbacks that we need to setup?
if (fd->cb_func)
return transport_fd_install(fd);
else
return SUCCESS;
}
void transport_fd_invoke (struct transport_fd *fd, short what)
{
// invoke
transport_invoke(TRANSPORT_FD_BASE(fd), what);
}
err_t transport_fd_close (struct transport_fd *fd)
{
int _fd = fd->fd;
// remove any installed events
transport_fd_remove(fd);
// invalidate fd
fd->fd = TRANSPORT_FD_INVALID;
// close the fd
if (_fd != TRANSPORT_FD_INVALID && close(_fd))
return ERR_CLOSE;
return SUCCESS;
}
void transport_fd_deinit (struct transport_fd *fd)
{
err_t tmp;
// XXX: this might block
if ((tmp = transport_fd_close(fd)))
log_warn_err(tmp, "close");
}