diff -r 5229a5d098b2 -r cefec18b8268 src/lib/transport_test.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/transport_test.c Thu May 28 01:17:36 2009 +0300 @@ -0,0 +1,366 @@ +#include "transport_test.h" +#include "transport_internal.h" + +#include +#include +#include +#include +#include + +/** + * Simple IO vector + */ +struct io_vec { + /** The buffer */ + char *buf; + + /** Buffer size */ + size_t len; +}; + +/** + * Simple vectored IO-buffer + */ +struct io_buf { + /** The array of buffer-vectors, {NULL}-terminated */ + struct io_vec *vecs; + + /** The number of io_vecs */ + size_t count; + + /** Current read/write vector */ + struct io_vec *read_vec, *write_vec; + + /** Offset into current vector */ + size_t off; +}; + +/** + * Forward-declare our transport_type + */ +extern const struct transport_type transport_test_type; + + +/** + * A dummy sock_stream implementation intended for testing purposes. + */ +struct transport_test { + /** The base transport stuff */ + struct transport base; + + /** The send/recieve buffers */ + struct io_buf send_buf, recv_buf; + + /** No more data is going to be added, return EOF once all the rest is consumed */ + bool eof; +}; + +/** + * Get a transport pointer from a transport_test pointer + */ +#define TRANSPORT_TEST_BASE(tp_ptr) (&(tp_ptr)->base) + +/** + * Grow buf->vecs if needed to ensure that buf->write_vec points to a valid io_vec + */ +static err_t io_buf_grow (struct io_buf *buf) +{ + size_t read_vec_offset = buf->read_vec ? (buf->read_vec - buf->vecs) : 0; + size_t write_vec_offset = buf->write_vec ? (buf->write_vec - buf->vecs) : 0; + struct io_vec *v; + struct io_vec *vecs_tmp = buf->vecs; + + // don't grow if not full + if (buf->vecs && buf->write_vec < buf->vecs + buf->count) + return SUCCESS; + + // new size + buf->count = buf->count * 2 + 1; + + // grow + if ((buf->vecs = realloc(buf->vecs, buf->count * sizeof(struct io_vec))) == NULL) { + // restore old value + buf->vecs = vecs_tmp; + + return ERR_CALLOC; + } + + // restore vec positions + buf->write_vec = buf->vecs + write_vec_offset; + buf->read_vec = buf->vecs + read_vec_offset; + + // zero new vecs + for (v = buf->write_vec; v < buf->vecs + buf->count; v++) + memset(v, 0, sizeof(*v)); + + // ok + return SUCCESS; +} + +/** + * Write some data to an io_buf, copying it. + */ +static err_t io_buf_write (struct io_buf *buf, const char *data, size_t len) +{ + error_t err; + + // ensure there's room + if ((ERROR_CODE(&err) = io_buf_grow(buf))) + goto error; + + // the vector to use + struct io_vec *vec = buf->write_vec; + + // allocate + if ((vec->buf = malloc(len)) == NULL) + JUMP_SET_ERROR(&err, ERR_MEM); + + // store + vec->len = len; + memcpy(vec->buf, data, len); + + // vec consumed + buf->write_vec++; + + // ok + return SUCCESS; + +error: + return ERROR_CODE(&err); +} + +/** + * Destroy the io_buf, freeing all resources. + * + * The io_buf must not be used anymore. + */ +static void io_buf_destroy (struct io_buf *buf) +{ + size_t i; + + // free the io_vec buffers + for (i = 0; i < buf->count; i++) { + free(buf->vecs[i].buf); + } + + // free the vector list + free(buf->vecs); +} + +/** + * transport_methods::read implementation. + */ +static err_t transport_test__read (transport_t *transport, void *buf_ptr, size_t *len, error_t *err) +{ + struct transport_test *tp = transport_check(transport, &transport_test_type); + struct io_buf *buf = &tp->recv_buf; + struct io_vec *vec = buf->read_vec; + + // EOF/nonblock if we're past the end of the last vector + if (!vec || vec == buf->vecs + buf->count || buf->off >= vec->len) { + if (!tp->eof) { + // wait for more to be fed in + *len = 0; + return SUCCESS; + + } else { + // EOF! + return SET_ERROR(err, ERR_EOF); + } + } + + // amount of data available in this iovec + size_t available = vec->len - buf->off; + + // amount to read + size_t to_read = *len; + + // trim down? + if (to_read > available) + to_read = available; + + // copy + memcpy(buf_ptr, vec->buf + buf->off, to_read); + + + if (to_read < available) { + // bytes still left in the vector + buf->off += to_read; + + } else { + // consumed the whole vector + // XXX: release data? + buf->read_vec++; + buf->off = 0; + } + + // update len + *len = to_read; + + // ok + return SUCCESS; +} + +/** + * transport_methods::write implementation. + */ +static err_t transport_test__write (transport_t *transport, const void *data, size_t *len, error_t *err) +{ + struct transport_test *tp = transport_check(transport, &transport_test_type); + + // write it out + // XXX: partial writes? + if ((ERROR_CODE(err) = io_buf_write(&tp->send_buf, data, *len))) + goto error; + + // ok + return SUCCESS; + +error: + return ERROR_CODE(err); +} + +static err_t transport_test__events (transport_t *transport, short mask, error_t *err) +{ + struct transport_test *tp = transport_check(transport, &transport_test_type); + + (void) tp; + (void) mask; + (void) err; + + // XXX: don't re-trigger anything + + return SUCCESS; +} + +static void transport_test__deinit (transport_t *transport) +{ + struct transport_test *tp = transport_check(transport, &transport_test_type); + + transport_test_destroy(tp); +} + +/* + * Our sock_stream_type + */ +const struct transport_type transport_test_type = { + .base_type = { + .parent = &transport_type_type, + }, + .methods = { + .read = transport_test__read, + .write = transport_test__write, + .events = transport_test__events, + .deinit = transport_test__deinit + }, +}; + +struct transport_test* transport_test_create (struct transport_info *info) +{ + struct transport_test *tp; + + // allocate + assert((tp = calloc(1, sizeof(*tp)))); + + // initialize base with our transport_type + transport_init(TRANSPORT_TEST_BASE(tp), &transport_test_type, info); + + // ok + return tp; +} + +transport_t* transport_test_cast (struct transport_test *tp) +{ + return TRANSPORT_TEST_BASE(tp); +} + +void transport_test_event (struct transport_test *tp, short what) +{ + // invoke, masking out as needed + // this won't do anything if all the bits are masked out + transport_invoke(TRANSPORT_TEST_BASE(tp), what & TRANSPORT_TEST_BASE(tp)->info.ev_mask); +} + +void transport_test_push_buf (struct transport_test *tp, const char *data, size_t len) +{ + // push it + assert(io_buf_write(&tp->recv_buf, data, len) == SUCCESS); + + // notify + transport_test_event(tp, TRANSPORT_READ); +} + +void transport_test_push_str (struct transport_test *tp, const char *str) +{ + // push it + transport_test_push_buf(tp, str, strlen(str)); +} + +void transport_test_push_fmt (struct transport_test *tp, const char *fmt, ...) +{ + char buf[TRANSPORT_TEST_FMT_MAX]; + size_t ret; + + // format + va_list vargs; va_start(vargs, fmt); + assert((ret = vsnprintf(buf, sizeof(buf), fmt, vargs)) <= sizeof(buf)); + va_end(vargs); + + // push it + transport_test_push_buf(tp, buf, ret); +} + +void transport_test_push_eof (struct transport_test *tp) +{ + // update state + tp->eof = true; + + transport_test_event(tp, TRANSPORT_READ); +} + +void transport_test_pull_buf (struct transport_test *tp, char **buf_ptr, size_t *len_ptr) +{ + struct io_buf *buf = &tp->send_buf; + size_t len = 0, i, off = 0; + char *out; + + // calculate total size + for (i = 0; i < buf->count; i++) { + len += buf->vecs[i].len; + } + + // alloc + assert((out = malloc(len))); + + // copy + for (i = 0; i < buf->count; i++) { + struct io_vec *vec = buf->vecs + i; + + memcpy(out + off, vec->buf, vec->len); + off += vec->len; + + // zero + free(vec->buf); vec->buf = NULL; + vec->len = 0; + } + + // update return + *buf_ptr = out; + *len_ptr = len; + + // update write_vec + buf->write_vec = buf->vecs; +} + +void transport_test_async_error (struct transport_test *tp, const error_t *err) +{ + transport_error(&tp->base, err); +} + +void transport_test_destroy (struct transport_test *tp) +{ + // free the buffers + io_buf_destroy(&tp->send_buf); + io_buf_destroy(&tp->recv_buf); +} +