#include "transport_test.h"
#include "transport_internal.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
#include <assert.h>
/**
* Simple IO vector
*/
struct io_vec {
/** The buffer */
char *buf;
/** Buffer size */
size_t len;
};
/**
* Simple vectored IO-buffer
*/
struct io_buf {
/** The array of buffer-vectors, {NULL}-terminated */
struct io_vec *vecs;
/** The number of io_vecs */
size_t count;
/** Current read/write vector */
struct io_vec *read_vec, *write_vec;
/** Offset into current vector */
size_t off;
};
/**
* Forward-declare our transport_type
*/
extern const struct transport_type transport_test_type;
/**
* A dummy sock_stream implementation intended for testing purposes.
*/
struct transport_test {
/** The base transport stuff */
struct transport base;
/** The send/recieve buffers */
struct io_buf send_buf, recv_buf;
/** No more data is going to be added, return EOF once all the rest is consumed */
bool eof;
};
/**
* Get a transport pointer from a transport_test pointer
*/
#define TRANSPORT_TEST_BASE(tp_ptr) (&(tp_ptr)->base)
/**
* Grow buf->vecs if needed to ensure that buf->write_vec points to a valid io_vec
*/
static err_t io_buf_grow (struct io_buf *buf)
{
size_t read_vec_offset = buf->read_vec ? (buf->read_vec - buf->vecs) : 0;
size_t write_vec_offset = buf->write_vec ? (buf->write_vec - buf->vecs) : 0;
struct io_vec *v;
struct io_vec *vecs_tmp = buf->vecs;
// don't grow if not full
if (buf->vecs && buf->write_vec < buf->vecs + buf->count)
return SUCCESS;
// new size
buf->count = buf->count * 2 + 1;
// grow
if ((buf->vecs = realloc(buf->vecs, buf->count * sizeof(struct io_vec))) == NULL) {
// restore old value
buf->vecs = vecs_tmp;
return ERR_CALLOC;
}
// restore vec positions
buf->write_vec = buf->vecs + write_vec_offset;
buf->read_vec = buf->vecs + read_vec_offset;
// zero new vecs
for (v = buf->write_vec; v < buf->vecs + buf->count; v++)
memset(v, 0, sizeof(*v));
// ok
return SUCCESS;
}
/**
* Write some data to an io_buf, copying it.
*/
static err_t io_buf_write (struct io_buf *buf, const char *data, size_t len)
{
error_t err;
// ensure there's room
if ((ERROR_CODE(&err) = io_buf_grow(buf)))
goto error;
// the vector to use
struct io_vec *vec = buf->write_vec;
// allocate
if ((vec->buf = malloc(len)) == NULL)
JUMP_SET_ERROR(&err, ERR_MEM);
// store
vec->len = len;
memcpy(vec->buf, data, len);
// vec consumed
buf->write_vec++;
// ok
return SUCCESS;
error:
return ERROR_CODE(&err);
}
/**
* Destroy the io_buf, freeing all resources.
*
* The io_buf must not be used anymore.
*/
static void io_buf_destroy (struct io_buf *buf)
{
size_t i;
// free the io_vec buffers
for (i = 0; i < buf->count; i++) {
free(buf->vecs[i].buf);
}
// free the vector list
free(buf->vecs);
}
/**
* transport_methods::read implementation.
*/
static err_t transport_test__read (transport_t *transport, void *buf_ptr, size_t *len, error_t *err)
{
struct transport_test *tp = transport_check(transport, &transport_test_type);
struct io_buf *buf = &tp->recv_buf;
struct io_vec *vec = buf->read_vec;
// EOF/nonblock if we're past the end of the last vector
if (!vec || vec == buf->vecs + buf->count || buf->off >= vec->len) {
if (!tp->eof) {
// wait for more to be fed in
*len = 0;
return SUCCESS;
} else {
// EOF!
return SET_ERROR(err, ERR_EOF);
}
}
// amount of data available in this iovec
size_t available = vec->len - buf->off;
// amount to read
size_t to_read = *len;
// trim down?
if (to_read > available)
to_read = available;
// copy
memcpy(buf_ptr, vec->buf + buf->off, to_read);
if (to_read < available) {
// bytes still left in the vector
buf->off += to_read;
} else {
// consumed the whole vector
// XXX: release data?
buf->read_vec++;
buf->off = 0;
}
// update len
*len = to_read;
// ok
return SUCCESS;
}
/**
* transport_methods::write implementation.
*/
static err_t transport_test__write (transport_t *transport, const void *data, size_t *len, error_t *err)
{
struct transport_test *tp = transport_check(transport, &transport_test_type);
// write it out
// XXX: partial writes?
if ((ERROR_CODE(err) = io_buf_write(&tp->send_buf, data, *len)))
goto error;
// ok
return SUCCESS;
error:
return ERROR_CODE(err);
}
static err_t transport_test__events (transport_t *transport, short mask, error_t *err)
{
struct transport_test *tp = transport_check(transport, &transport_test_type);
(void) tp;
(void) mask;
(void) err;
// XXX: don't re-trigger anything
return SUCCESS;
}
static void transport_test__deinit (transport_t *transport)
{
struct transport_test *tp = transport_check(transport, &transport_test_type);
transport_test_destroy(tp);
}
/*
* Our sock_stream_type
*/
const struct transport_type transport_test_type = {
.base_type = {
.parent = &transport_type_type,
},
.methods = {
.read = transport_test__read,
.write = transport_test__write,
.events = transport_test__events,
.deinit = transport_test__deinit
},
};
struct transport_test* transport_test_create (struct transport_info *info)
{
struct transport_test *tp;
// allocate
assert((tp = calloc(1, sizeof(*tp))));
// initialize base with our transport_type
transport_init(TRANSPORT_TEST_BASE(tp), &transport_test_type, info);
// ok
return tp;
}
transport_t* transport_test_cast (struct transport_test *tp)
{
return TRANSPORT_TEST_BASE(tp);
}
void transport_test_event (struct transport_test *tp, short what)
{
// invoke, masking out as needed
// this won't do anything if all the bits are masked out
transport_invoke(TRANSPORT_TEST_BASE(tp), what & TRANSPORT_TEST_BASE(tp)->info.ev_mask);
}
void transport_test_push_buf (struct transport_test *tp, const char *data, size_t len)
{
// push it
assert(io_buf_write(&tp->recv_buf, data, len) == SUCCESS);
// notify
transport_test_event(tp, TRANSPORT_READ);
}
void transport_test_push_str (struct transport_test *tp, const char *str)
{
// push it
transport_test_push_buf(tp, str, strlen(str));
}
void transport_test_push_fmt (struct transport_test *tp, const char *fmt, ...)
{
char buf[TRANSPORT_TEST_FMT_MAX];
size_t ret;
// format
va_list vargs; va_start(vargs, fmt);
assert((ret = vsnprintf(buf, sizeof(buf), fmt, vargs)) <= sizeof(buf));
va_end(vargs);
// push it
transport_test_push_buf(tp, buf, ret);
}
void transport_test_push_eof (struct transport_test *tp)
{
// update state
tp->eof = true;
transport_test_event(tp, TRANSPORT_READ);
}
void transport_test_pull_buf (struct transport_test *tp, char **buf_ptr, size_t *len_ptr)
{
struct io_buf *buf = &tp->send_buf;
size_t len = 0, i, off = 0;
char *out;
// calculate total size
for (i = 0; i < buf->count; i++) {
len += buf->vecs[i].len;
}
// alloc
assert((out = malloc(len)));
// copy
for (i = 0; i < buf->count; i++) {
struct io_vec *vec = buf->vecs + i;
memcpy(out + off, vec->buf, vec->len);
off += vec->len;
// zero
free(vec->buf); vec->buf = NULL;
vec->len = 0;
}
// update return
*buf_ptr = out;
*len_ptr = len;
// update write_vec
buf->write_vec = buf->vecs;
}
void transport_test_async_error (struct transport_test *tp, const error_t *err)
{
transport_error(&tp->base, err);
}
void transport_test_destroy (struct transport_test *tp)
{
// free the buffers
io_buf_destroy(&tp->send_buf);
io_buf_destroy(&tp->recv_buf);
}