implement msg_proto and associated test, fix misc. other bugs (including changing error_info::code to a signed int\!)
authorTero Marttila <terom@fixme.fi>
Sun, 10 May 2009 23:37:43 +0300
changeset 196 873796250c60
parent 195 42aedce3e2eb
child 197 3f9175ead92a
implement msg_proto and associated test, fix misc. other bugs (including changing error_info::code to a signed int\!)
src/CMakeLists.txt
src/error.c
src/error.h
src/msg_proto.c
src/msg_proto.h
src/test/assert.c
src/test/assert.h
src/test/fail.h
src/test/msg_proto.c
src/test/test_list.inc
src/test/transport.c
src/test/transport.h
src/transport.h
src/transport_test.c
src/transport_test.h
--- a/src/CMakeLists.txt	Fri May 08 02:51:20 2009 +0300
+++ b/src/CMakeLists.txt	Sun May 10 23:37:43 2009 +0300
@@ -11,12 +11,13 @@
 
 # define our source code modules
 set (CORE_SOURCES error.c log.c str.c object.c)
-set (IO_SOURCES transport.c service.c transport_fd.c sock.c resolve.c tcp.c tcp_transport.c tcp_client.c tcp_server.c ssl.c ssl_client.c fifo.c line_proto.c)
+set (IO_SOURCES transport.c service.c transport_fd.c sock.c resolve.c tcp.c tcp_transport.c tcp_client.c tcp_server.c ssl.c ssl_client.c fifo.c)
+set (PROTO_SOURCES line_proto.c msg_proto.c)
 set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c irc_net_connect.c)
 set (LUA_SOURCES nexus_lua.c lua_objs.c lua_config.c lua_irc.c lua_func.c lua_type.c)
 set (CONSOLE_SOURCES console.c lua_console.c)
 
-set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${IO_SOURCES} ${IRC_SOURCES} ${LUA_SOURCES} ${CONSOLE_SOURCES} signals.c module.c config.c)
+set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${IO_SOURCES} ${PROTO_SOURCES} ${IRC_SOURCES} ${LUA_SOURCES} ${CONSOLE_SOURCES} signals.c module.c config.c)
 set (IRC_LOG_SOURCES modules/irc_log.c)
 set (LOGWATCH_SOURCES modules/logwatch.c modules/logwatch_source.c modules/logwatch_filter.c modules/logwatch_chan.c)
 
@@ -55,7 +56,7 @@
 if (ENABLE_TEST)
     # build list of source files
     file (GLOB _TEST_SOURCES "test/*.c")
-    set (TEST_SOURCES ${_TEST_SOURCES} ${CORE_SOURCES} ${IO_SOURCES} transport_test.c ${IRC_SOURCES})
+    set (TEST_SOURCES ${_TEST_SOURCES} ${CORE_SOURCES} ${IO_SOURCES} ${PROTO_SOURCES} transport_test.c ${IRC_SOURCES})
     
     # add executable target and link against libs
     add_executable (test_harness EXCLUDE_FROM_ALL ${TEST_SOURCES})
--- a/src/error.c	Fri May 08 02:51:20 2009 +0300
+++ b/src/error.c	Sun May 10 23:37:43 2009 +0300
@@ -123,10 +123,7 @@
     NULL
 };
 
-/**
- * Look up the error_desc for the given error code
- */
-static const struct error_desc* error_lookup_desc (err_t code)
+const struct error_desc* error_lookup (err_t code)
 {
     struct error_desc **desc_table, *desc = NULL;
 
@@ -152,7 +149,7 @@
         // no error...
         return "success";
 
-    else if ((desc = error_lookup_desc(code)))
+    else if ((desc = error_lookup(code)))
         // found an error_desc for it
         return desc->name;
 
@@ -167,7 +164,7 @@
     const struct error_desc *desc;
     
     // do we have an error_desc for it?
-    if ((desc = error_lookup_desc(err->code)) == NULL)
+    if ((desc = error_lookup(err->code)) == NULL)
         // ???
         snprintf(msg, ERROR_MSG_MAXLEN, "[%#.8x]: %#.8x", err->code, err->extra);
     
@@ -209,3 +206,37 @@
     return msg;
 }
 
+bool error_cmp_eq (const error_t *a, const error_t *b)
+{
+    const struct error_desc *desc;
+
+    // compare the top-level code
+    if (a->code != b->code)
+        return false;
+
+    // lookup the extra type
+    if ((desc = error_lookup(a->code)) == NULL)
+        // not good...
+        return false;
+    
+    // compare by type
+    switch (desc->extra_type) {
+        case ERR_EXTRA_NONE:
+            return true;
+            
+        case ERR_EXTRA_ERRNO:
+        case ERR_EXTRA_GAI:
+        case ERR_EXTRA_GNUTLS:
+            // integer comparison
+            return (a->extra == b->extra);
+
+        case ERR_EXTRA_STR:
+            // string comparison
+            return a->extra_str && b->extra_str && (strcmp(a->extra_str, b->extra_str) == 0);
+
+        default:
+            // ???
+            return false;
+    }
+}
+
--- a/src/error.h	Fri May 08 02:51:20 2009 +0300
+++ b/src/error.h	Sun May 10 23:37:43 2009 +0300
@@ -7,6 +7,7 @@
  * Error-handling functions
  */
 #include <errno.h>
+#include <stdbool.h>
 
 /**
  * The type used for error codes is an explicitly *unsigned* int, meaning that error codes themselves are positive.
@@ -161,8 +162,12 @@
  * An error code and associated extra infos
  */
 struct error_info {
-    /** The base error code */
-    err_t code;
+    /** 
+     * The base error code.
+     *
+     * This is a signed int because we need to be able to manipulate negative errors codes as well.
+     */
+    signed int code;
     
     union {
         /** Additional detail info, usually some third-party error code, as defined by the code's ERR_EXTRA_* */
@@ -184,6 +189,11 @@
 const char *error_name (err_t code);
 
 /**
+ * Look up the error_desc for the given error code
+ */
+const struct error_desc* error_lookup (err_t code);
+
+/**
  * Maximum length of error messages returned by error_msg (including NUL byte)
  */
 #define ERROR_MSG_MAXLEN 1024
@@ -193,7 +203,12 @@
  *
  * This is returned as a pointer into a statically allocated buffer. It is not re-entrant.
  */
-const char *error_msg (const struct error_info *err);
+const char *error_msg (const error_t *err);
+
+/**
+ * Compare the given errors for equivalency
+ */
+bool error_cmp_eq (const error_t *a, const error_t *b);
 
 /** No error, evaulates as logical false */
 #define SUCCESS (0)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/msg_proto.c	Sun May 10 23:37:43 2009 +0300
@@ -0,0 +1,493 @@
+#include "msg_proto.h"
+
+#include <string.h>
+#include <stdint.h>
+#include <arpa/inet.h>
+
+/**
+ * I/O buffer
+ */
+struct msg_buf {
+    /** Buffer base pointer */
+    char *base;
+
+    /** Size of the buffer */
+    size_t size;
+
+    /** Current read/write offset */
+    size_t off;
+};
+
+/**
+ * The minimum size used for any msg_buf::size related operation.
+ */
+#define MSG_BUF_MIN_SIZE 1024
+
+/**
+ * Growth rate for size
+ */
+#define MSG_BUF_GROW_RATE 2
+
+/**
+ * Initialize a message buffer at the given initial size
+ */
+err_t msg_buf_init (struct msg_buf *buf, size_t hint)
+{
+    // apply minimum size
+    if (hint < MSG_BUF_MIN_SIZE)
+        hint = MSG_BUF_MIN_SIZE;
+
+    // allocate the initial buffer
+    if ((buf->base = malloc(hint)) == NULL)
+        return ERR_MEM;
+    
+    // set fields
+    buf->size = hint;
+    buf->off = 0;
+
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Grow the buffer if needed to fit the given capacity.
+ */
+err_t msg_buf_grow (struct msg_buf *buf, size_t size)
+{
+    char *tmp = buf->base;
+
+    if (buf->size >= size)
+        // nothing to do
+        return SUCCESS;
+
+    // calculate new size
+    while (buf->size < size)
+        buf->size *= MSG_BUF_GROW_RATE;
+
+    // resize
+    if ((buf->base = realloc(buf->base, buf->size)) == NULL) {
+        buf->base = tmp;
+
+        return ERR_MEM;
+    }
+
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Drain \a len bytes off the head of the buffer
+ */
+err_t msg_buf_drain (struct msg_buf *buf, size_t len)
+{
+    // simple memmove
+    memmove(buf->base, buf->base + len, buf->off - len);
+
+    // update offfset
+    buf->off -= len;
+    
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Read into the buffer from a transport_t.
+ *
+ * This will attempt to read \a len bytes onto the end of the buffer, growing it if needed to fit.
+ *
+ * This may read/return more data than the given len. Use msg_buf_drain the remove the data from the buffer once you
+ * have used it.
+ *
+ * Returns the number of new bytes read, zero for transport read buffer empty, -err_t for error.
+ */
+ssize_t msg_buf_read (struct msg_buf *buf, transport_t *transport, size_t len, error_t *err)
+{
+    ssize_t ret;
+
+    // clamp size
+    if (len < MSG_BUF_MIN_SIZE)
+        len = MSG_BUF_MIN_SIZE;
+
+    // ensure space
+    if ((ERROR_CODE(err) = msg_buf_grow(buf, buf->off + len)))
+        goto error;
+
+    // read
+    if ((ret = transport_read(transport, buf->base + buf->off, len, err)) < 0)
+        goto error;
+    
+    // no data left?
+    if (!ret)
+        return 0;
+
+    // update offset
+    buf->off += ret;
+
+    // ok
+    return ret;
+
+error:
+    return -ERROR_CODE(err);    
+}
+
+/**
+ * Drives transport_write on the given data until all the given data is written, or zero is returned.
+ *
+ * @param transport transport to write to
+ * @param data input data
+ * @param len number of bytes to write from data
+ * @param err returned error info
+ * @return number of bytes written (which may be zero or less than len), or -err_t.
+ */
+static ssize_t _transport_write_all (transport_t *transport, const char *data, size_t len, error_t *err)
+{
+    ssize_t ret;
+    size_t written = 0;
+
+    while (len) {
+        // try and write out remaining data
+        if ((ret = transport_write(transport, data, len, err)) < 0)
+            goto error;
+        
+        if (!ret) {
+            // write buffer full
+            break;
+
+        } else { 
+            // update and continue
+            written += ret;
+            data += ret;
+            len -= ret;
+        }
+    }
+
+    // ok
+    return written;
+
+error:
+    return -ERROR_CODE(err);    
+}
+
+/**
+ * If the buffer is empty, this will attempt to write the given data directly using transport_write until either all
+ * the data is written (in which case nothing more needs to be done), or the transport won't accept any more writes,
+ * in which case the remaining data will be buffered.
+ *
+ * If the buffer is not empty, then the given data will be added to the end of the buffer, since otherwise the order of
+ * data would be broken.
+ *
+ * In either case, transport_write semantics garuntee that our buffer will either be empty, or an on_write will be
+ * pending on the transport. See msg_buf_flush() for how to handle transport_callbacks::on_write.
+ */
+err_t msg_buf_write (struct msg_buf *buf, transport_t *transport, const void *data_ptr, size_t len, error_t *err)
+{
+    ssize_t ret;
+    const char *data = data_ptr;
+
+    if (!buf->off) {
+        // no data buffered, so we can try and write directly
+        if ((ret = _transport_write_all(transport, data, len, err)) < 0)
+            goto error;
+    
+        // update written
+        data += ret;
+        len -= ret;
+        
+        if (len == 0)
+            // wrote it all
+            return SUCCESS;
+    }
+
+    // ensure space
+    if ((ERROR_CODE(err) = msg_buf_grow(buf, buf->off + len)))
+        goto error;
+
+    // store
+    memcpy(buf->base + buf->off, data, len);
+    
+    // update
+    buf->off += len;
+
+    // ok
+    return SUCCESS;
+
+error:
+    return ERROR_CODE(err);    
+}
+
+/**
+ * Flush buffered write data to the transport, driving transport_write() until either all of our bufferd data has been
+ * written, or the transport will not accept any more.
+ *
+ * In either case, transport_write semantics garuntee that our buffer will either be empty, or an on_write will be
+ * pending on the transport.
+ */
+err_t msg_buf_flush (struct msg_buf *buf, transport_t *transport, error_t *err)
+{
+    ssize_t ret;
+
+    // write
+    if ((ret = _transport_write_all(transport, buf->base, buf->off, err)) < 0)
+        goto error;
+    
+    if (ret)
+        // unbuffer the written data
+        msg_buf_drain(buf, ret);
+
+    // ok
+    return SUCCESS;
+
+error:
+    return ERROR_CODE(err);    
+}
+
+/**
+ * Deinitialize msg_buf to release allocated buffers
+ */
+void msg_buf_deinit (struct msg_buf *buf)
+{
+    // release
+    free(buf->base);
+
+    // reset
+    buf->base = NULL;
+    buf->size = buf->off = 0;
+}
+
+/**
+ * Message header
+ */
+struct msg_header {
+    /** Message length, including header */
+    uint16_t len;
+};
+
+/**
+ * Message header size
+ */
+#define MSG_PROTO_HEADER_SIZE (sizeof(uint16_t))
+
+/**
+ * Our state struct
+ */
+struct msg_proto {
+    /** The transport */
+    transport_t *transport;
+
+    /** User callbacks */
+    const struct msg_proto_callbacks *cb_tbl;
+
+    /** User callback argument */
+    void *cb_arg;
+
+    /** Input buffer */
+    struct msg_buf in;
+
+    /** Output buffer */
+    struct msg_buf out;
+};
+
+/**
+ * Signal error to user
+ */
+static void msg_proto_error (struct msg_proto *proto, const error_t *err)
+{
+    // invoke user callback
+    proto->cb_tbl->on_error(proto, err, proto->cb_arg);
+}
+
+/**
+ * Attempt to read the current header from our input buffer.
+ *
+ * Returns >0 for full header, 0 for incomplete header, -err_t for error.
+ */
+static int msg_proto_peek_header (struct msg_proto *proto, struct msg_header *header, error_t *err)
+{
+    if (proto->in.off < MSG_PROTO_HEADER_SIZE)
+        // not enough data for header
+        return 0;
+
+    // read header
+    header->len = ntohs(*((uint16_t *) proto->in.base));
+
+    // bad header?
+    if (header->len < MSG_PROTO_HEADER_SIZE)
+        JUMP_SET_ERROR_STR(err, ERR_MISC, "message_header::len");
+
+    // ok, got header
+    return 1;
+
+error:
+    return -ERROR_CODE(err);    
+}
+
+/**
+ * Recieved a message with the given header, and a pointer to the message data
+ *
+ * XXX: what to do if the user callback destroys the msg_proto?
+ */
+static err_t msg_proto_on_msg (struct msg_proto *proto, struct msg_header *header, char *data, error_t *err)
+{
+    (void) err;
+
+    // invoke user callback
+    proto->cb_tbl->on_msg(proto, data, header->len - MSG_PROTO_HEADER_SIZE, proto->cb_arg);
+
+    // XXX: handle user errors
+    return SUCCESS;
+}
+
+static void msg_proto_on_read (transport_t *transport, void *arg)
+{
+    struct msg_proto *proto = arg;
+    struct msg_header header;
+    ssize_t ret;
+    error_t err;
+    
+    // we might be able to read more than one message per event
+    do {
+        // try and read message length for incomplete message
+        if ((ret = msg_proto_peek_header(proto, &header, &err)) < 0)
+            goto error;
+        
+        // need to read more data?
+        if (!ret || header.len > proto->in.off) {
+            // msg_buf_read a minimum size, so passing a zero is OK
+            size_t to_read = ret ? header.len : 0;
+
+            // read into our buffer
+            if ((ret = msg_buf_read(&proto->in, transport, to_read, &err)) < 0)
+                goto error;
+    
+        } else {
+            // handle full message
+            if (msg_proto_on_msg(proto, &header, proto->in.base + MSG_PROTO_HEADER_SIZE, &err))
+                goto error;
+            
+            // remove the data from the buffer
+            msg_buf_drain(&proto->in, header.len);
+        }
+    } while (ret);
+    
+    // ok
+    return;
+
+error:
+    // notify user
+    msg_proto_error(proto, &err);    
+}
+
+static void msg_proto_on_write (transport_t *transport, void *arg)
+{
+    struct msg_proto *proto = arg;
+    error_t err;
+
+    // flush
+    if (msg_buf_flush(&proto->out, transport, &err))
+        // notify user on transport errors
+        msg_proto_error(proto, &err);
+}
+
+static void msg_proto_on_error (transport_t *transport, const error_t *err, void *arg)
+{
+    struct msg_proto *proto = arg;
+
+    (void) transport;
+
+    // report to user
+    msg_proto_error(proto, err);
+}
+
+static const struct transport_callbacks msg_proto_transport_callbacks = {
+    .on_read    = msg_proto_on_read,
+    .on_write   = msg_proto_on_write,
+    .on_error   = msg_proto_on_error,
+};
+
+err_t msg_proto_create (struct msg_proto **proto_ptr, transport_t *transport, const struct msg_proto_callbacks *cb_tbl, void *cb_arg, error_t *err)
+{
+    struct msg_proto *proto;
+
+    // alloc
+    if ((proto = calloc(1, sizeof(*proto))) == NULL)
+        return ERR_MEM;
+
+    // store
+    proto->transport = transport;
+    proto->cb_tbl = cb_tbl;
+    proto->cb_arg = cb_arg;
+
+    // init
+    if (
+            (ERROR_CODE(err) = msg_buf_init(&proto->in, 0))
+        ||  (ERROR_CODE(err) = msg_buf_init(&proto->out, 0))
+    )
+        goto error;
+
+    // setup transport
+    if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE)))
+        goto error;
+
+    transport_set_callbacks(transport, &msg_proto_transport_callbacks, proto);
+
+    // ok
+    *proto_ptr = proto;
+
+    return SUCCESS;
+
+error:
+    // release
+    msg_proto_destroy(proto);
+
+    return ERROR_CODE(err);
+}
+
+/**
+ * Build and write out the data for the given header
+ */
+static err_t msg_proto_write_header (struct msg_proto *proto, const struct msg_header *header, error_t *err)
+{
+    char buf[MSG_PROTO_HEADER_SIZE];
+
+    // validate
+    if (header->len < MSG_PROTO_HEADER_SIZE)
+        return SET_ERROR(err, ERR_MISC);
+
+    // build
+    *((uint16_t *) buf) = htons(header->len);
+
+    // write
+    return msg_buf_write(&proto->out, proto->transport, buf, sizeof(buf), err);
+}
+
+err_t msg_proto_send (struct msg_proto *proto, const void *data, size_t len, error_t *err)
+{
+    struct msg_header header;
+
+    // build header
+    header.len = MSG_PROTO_HEADER_SIZE + len;
+
+    // write it
+    if (
+            msg_proto_write_header(proto, &header, err)
+        ||  msg_buf_write(&proto->out, proto->transport, data, len, err)
+    )
+        return ERROR_CODE(err);
+
+    // ok
+    return SUCCESS;
+}
+
+void msg_proto_destroy (struct msg_proto *proto)
+{
+    // drop buffers
+    msg_buf_deinit(&proto->in);
+    msg_buf_deinit(&proto->out);
+
+    // kill transport
+    transport_destroy(proto->transport);
+
+    // release ourself
+    free(proto);
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/msg_proto.h	Sun May 10 23:37:43 2009 +0300
@@ -0,0 +1,54 @@
+#ifndef MSG_PROTO_H
+#define MSG_PROTO_H
+
+/**
+ * @param
+ *
+ * Support for simple protocols that send/recieve length-prefixed messages over a transport stream.
+ *
+ * This implementation is mostly geared towards handling a reasonable number of reasonably sized messages in a
+ * reasonable way. Hence, 
+ */
+#include "transport.h"
+
+/** 
+ * Protocol state struct
+ */
+struct msg_proto;
+
+/**
+ * User callbacks
+ */
+struct msg_proto_callbacks {
+    /**
+     * Message recieved.
+     *
+     * XXX: currently you must not call msg_proto_destroy from within this callback
+     */
+    void (*on_msg) (struct msg_proto *proto, void *data, size_t len, void *arg);
+
+    /**
+     * Transport/protocol error occured in event handling.
+     */
+    void (*on_error) (struct msg_proto *proto, const error_t *err, void *arg);
+};
+
+/**
+ * Create a msg_proto state using the given transport.
+ *
+ * This will install our callback handlers on the given transport.
+ */
+err_t msg_proto_create (struct msg_proto **proto_ptr, transport_t *transport, const struct msg_proto_callbacks *cb_tbl, void *cb_arg, error_t *err);
+
+/**
+ * Send a message to the other endpoint
+ */
+err_t msg_proto_send (struct msg_proto *proto, const void *data, size_t len, error_t *err);
+
+/**
+ * Destroy the protocol state and transport
+ */
+void msg_proto_destroy (struct msg_proto *proto);
+
+
+#endif /* MSG_PROTO_H */
--- a/src/test/assert.c	Fri May 08 02:51:20 2009 +0300
+++ b/src/test/assert.c	Sun May 10 23:37:43 2009 +0300
@@ -42,6 +42,15 @@
         _ASSERT_FUNC_FAIL("%s != NULL", dump_str(str));
 }
 
+void _assert_memcmp(_ASSERT_FUNC_ARGS, const char *is, const char *should_be, size_t len)
+{
+    if (!should_be && !is)
+        return;
+    
+    if (!is || memcmp(is, should_be, len))
+        _ASSERT_FUNC_FAIL("%s:%zu != %s", dump_strn(is, len), len, dump_strn(should_be, len));  
+}
+
 void _assert_success (_ASSERT_FUNC_ARGS, err_t err)
 {
     if (err)
@@ -54,9 +63,9 @@
         _ASSERT_FUNC_FAIL("err: %s != %s", dump_str(error_name(is)), dump_str(error_name(should_be)));
 }
 
-void _assert_error (_ASSERT_FUNC_ARGS, error_t *is, error_t *should_be)
+void _assert_error (_ASSERT_FUNC_ARGS, const error_t *is, const error_t *should_be)
 {
-    if (ERROR_CODE(is) != ERROR_CODE(should_be) || ERROR_EXTRA(is) != ERROR_EXTRA(should_be))
+    if (!error_cmp_eq(is, should_be))
         _ASSERT_FUNC_FAIL("error: %s != %s", dump_str(error_msg(is)), dump_str(error_msg(should_be)));
 
 }
--- a/src/test/assert.h	Fri May 08 02:51:20 2009 +0300
+++ b/src/test/assert.h	Sun May 10 23:37:43 2009 +0300
@@ -21,9 +21,14 @@
 #define _ASSERT_FUNC_CALL(func, ...) func(__func__, __FILE__, __LINE__, __VA_ARGS__)
 
 /**
- * Assert that the given condition is true, and fail with the given error if not
+ * Assert that the given condition is true
  */
-#define assert_true(cond, ...) fail_if(cond, __VA_ARGS__)
+#define assert_true(cond) fail_if(!(cond), "assert_true(%s)", #cond)
+
+/**
+ * Assert that the given condition is not true
+ */
+#define assert_false(cond) fail_if(cond, "assert_false(%s)", #cond)
 
 /**
  * Assert failure unconditionally
@@ -61,6 +66,12 @@
 void _assert_strnull (_ASSERT_FUNC_ARGS, const char *str);
 
 /**
+ * Assert that the two given buffers contain the same data using memcmp().
+ */
+#define assert_memcmp(is, should_be, len) _ASSERT_FUNC_CALL(_assert_memcmp, is, should_be, len)
+void _assert_memcmp(_ASSERT_FUNC_ARGS, const char *is, const char *should_be, size_t len);
+
+/**
  * Assert that the given error code is SUCCESS.
  */
 #define assert_success(err) _ASSERT_FUNC_CALL(_assert_success, err)
@@ -76,6 +87,6 @@
  * Assert that the given actual error \a is matches the expected error \a should_be
  */ 
 #define assert_error(is, should_be) _ASSERT_FUNC_CALL(_assert_error, is, should_be)
-void _assert_error (_ASSERT_FUNC_ARGS, error_t *is, error_t *should_be);
+void _assert_error (_ASSERT_FUNC_ARGS, const error_t *is, const error_t *should_be);
 
 #endif
--- a/src/test/fail.h	Fri May 08 02:51:20 2009 +0300
+++ b/src/test/fail.h	Sun May 10 23:37:43 2009 +0300
@@ -25,7 +25,7 @@
 void test_fail (const char *func, const char *file, int line, int skip, const char *fmt, ...) NORETURN;
 
 /**
- * Fail with a simply error message
+ * Fail with a simple error message
  */
 #define fail(...) test_fail(__func__, __FILE__, __LINE__, 0, __VA_ARGS__)
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/test/msg_proto.c	Sun May 10 23:37:43 2009 +0300
@@ -0,0 +1,209 @@
+#include "../msg_proto.h"
+#include "test.h"
+#include "transport.h"
+
+#include <stdint.h>
+
+struct test_msg_proto_ctx {
+    struct msg_proto *proto;
+
+    bool on_msg, on_error;
+
+    size_t msg_count;
+
+    const char *msg;
+    const error_t *err;
+};
+
+static void on_msg (struct msg_proto *proto, void *data, size_t len, void *arg)
+{
+    struct test_msg_proto_ctx *ctx = arg;
+
+    log_debug("data=%s, len=%zu", dump_strn(data, len), len);
+
+    // check state
+    assert(proto == ctx->proto);
+    assert(!ctx->on_msg);
+    assert(ctx->msg);
+    
+    // check it's the correct message data
+    assert_strncmp(data, ctx->msg, len);
+    assert_strlen(ctx->msg, len);
+
+    // mark
+    if (!ctx->msg_count || !--ctx->msg_count) {
+        ctx->msg = NULL;
+        ctx->on_msg = true;
+    }
+}
+
+static void on_error (struct msg_proto *proto, const error_t *err, void *arg)
+{
+    struct test_msg_proto_ctx *ctx = arg;
+
+    log_debug("err=%s", dump_str(error_msg(err)));
+
+    // check state
+    assert(proto == ctx->proto);
+    assert(!ctx->on_error);
+    assert(ctx->err);
+
+    // check it's the right error
+    assert_error(err, ctx->err);
+
+    // mark
+    ctx->err = NULL;
+    ctx->on_error = true;
+}
+
+static const struct msg_proto_callbacks _callbacks = {
+    .on_msg     = on_msg,
+    .on_error   = on_error,
+};
+
+struct msg_proto* setup_msg_proto (struct test_msg_proto_ctx *ctx, struct transport_test *tp)
+{
+    error_t err;
+
+    // init
+    memset(ctx, 0, sizeof(*ctx));
+
+    // create it
+    assert_success(msg_proto_create(&ctx->proto, transport_test_cast(tp), &_callbacks, ctx, &err));
+
+    return ctx->proto;
+}
+
+void test_msg_proto_recv (void)
+{
+    struct test_msg_proto_ctx _ctx, *ctx = &_ctx;
+    struct transport_test *tp = setup_transport_test();
+    struct msg_proto *proto = setup_msg_proto(&_ctx, tp);
+    
+    // test full message
+    ctx->msg = "xxxx";
+    transport_test_push_buf(tp, "\x00\x06xxxx", 6);
+    assert_true(ctx->on_msg);
+    ctx->on_msg = false;
+
+    // test partial message (header+body)
+    ctx->msg = "xxxx";
+    transport_test_push_buf(tp, "\x00\x06", 2);
+    assert_false(ctx->on_msg);
+    transport_test_push_buf(tp, "xxxx", 4);
+    assert_true(ctx->on_msg);
+    ctx->on_msg = false;
+
+    // test partial header
+    ctx->msg = "xxxx";
+    transport_test_push_buf(tp, "\x00", 1);
+    assert_false(ctx->on_msg);
+    transport_test_push_buf(tp, "\x06xxxx", 5);
+    assert_true(ctx->on_msg);
+    ctx->on_msg = false;
+
+    // test two messages in one
+    ctx->msg = "zzzz";
+    ctx->msg_count = 2;
+    transport_test_push_buf(tp, "\x00\x06zzzz\x00\x06zzzz", 6*2);
+    assert_true(ctx->on_msg);
+    ctx->on_msg = false;
+
+    // cleanup
+    msg_proto_destroy(proto);
+}
+
+void test_msg_proto_send (void)
+{
+    struct test_msg_proto_ctx _ctx, *ctx = &_ctx;
+    struct transport_test *tp = setup_transport_test();
+    struct msg_proto *proto = setup_msg_proto(&_ctx, tp);
+    error_t err;
+    
+    // test full message
+    ctx->msg = "xxxx";
+    assert_success(msg_proto_send(proto, ctx->msg, strlen(ctx->msg), &err));
+    assert_transport_data_buf(tp, "\x00\x06xxxx", 6);
+
+    // cleanup
+    msg_proto_destroy(proto);
+
+    // XXX: test write buffering
+}
+
+void test_msg_proto_error (void)
+{
+    struct test_msg_proto_ctx _ctx, *ctx = &_ctx;
+    struct transport_test *tp = setup_transport_test();
+    struct msg_proto *proto = setup_msg_proto(&_ctx, tp);
+    error_t err;
+
+    (void) ctx;
+    
+    // XXX: bad way to test uint16_t underflow
+    // assert_err(msg_proto_send(proto, NULL, ((1 << 16) - 1) - 3, &err), ERR_MISC);
+
+    // length overflow
+    assert_err(msg_proto_send(proto, NULL, ((1 << 16) - 1), &err), ERR_MISC);
+
+    // cleanup
+    msg_proto_destroy(proto);
+}
+
+void test_msg_proto_error_transport (void)
+{
+    struct test_msg_proto_ctx _ctx, *ctx = &_ctx;
+    struct transport_test *tp = setup_transport_test();
+    struct msg_proto *proto = setup_msg_proto(&_ctx, tp);
+    error_t err;
+    
+    // dummy error
+    SET_ERROR_STR(&err, ERR_MISC, "random fail");
+    ctx->err = &err;
+
+    transport_test_async_error(tp, &err);
+    assert_true(ctx->on_error);
+    ctx->on_error = false;
+
+    // cleanup
+    msg_proto_destroy(proto);
+}
+
+void test_msg_proto_error_read (void)
+{
+    struct test_msg_proto_ctx _ctx, *ctx = &_ctx;
+    struct transport_test *tp = setup_transport_test();
+    struct msg_proto *proto = setup_msg_proto(&_ctx, tp);
+    error_t err;
+    
+    // testing with EOF is easiest
+    SET_ERROR(&err, ERR_EOF);
+    ctx->err = &err;
+
+    transport_test_push_eof(tp);
+    assert_true(ctx->on_error);
+    ctx->on_error = false;
+
+    // cleanup
+    msg_proto_destroy(proto);
+}
+
+void test_msg_proto_error_recv_invalid (void)
+{    
+    struct test_msg_proto_ctx _ctx, *ctx = &_ctx;
+    struct transport_test *tp = setup_transport_test();
+    struct msg_proto *proto = setup_msg_proto(&_ctx, tp);
+    error_t err;
+
+    // recieve with invalid header
+    SET_ERROR_STR(&err, ERR_MISC, "message_header::len");
+    ctx->err = &err;
+
+    transport_test_push_buf(tp, "\x00\x01x", 3);
+    assert_true(ctx->on_error);
+    ctx->on_error = false;
+
+    // cleanup
+    msg_proto_destroy(proto);
+}
+
--- a/src/test/test_list.inc	Fri May 08 02:51:20 2009 +0300
+++ b/src/test/test_list.inc	Sun May 10 23:37:43 2009 +0300
@@ -25,6 +25,12 @@
 TEST ( fifo,                    TEST_OPTIONAL   )
 TEST ( tcp,                     0               )
 TEST ( line_proto,              0               )
+TEST ( msg_proto_recv,          0               )
+TEST ( msg_proto_send,          0               )
+TEST ( msg_proto_error,         0               )
+TEST ( msg_proto_error_transport, 0             )
+TEST ( msg_proto_error_read,    0               )
+TEST ( msg_proto_error_recv_invalid, 0          )
 TEST ( irc_cmd,                 0               )
 TEST ( irc_line_parse,          0               )
 TEST ( irc_line_build,          0               )
--- a/src/test/transport.c	Fri May 08 02:51:20 2009 +0300
+++ b/src/test/transport.c	Sun May 10 23:37:43 2009 +0300
@@ -37,6 +37,24 @@
     assert_err(-transport_read(transport, &buf, 1, &err), ERR_EOF);
 }
 
+void assert_transport_data_buf (struct transport_test *tp, const char *data, size_t len)
+{
+    // get the data out
+    char *out;
+    size_t out_len;
+    
+    transport_test_pull_buf(tp, &out, &out_len);
+    
+    log_debug("pull_buf: %s", dump_strn(out, out_len));
+    
+    // should be the same
+    assert_memcmp(out, data, out_len);
+    assert(out_len == len);
+
+    // cleanup
+    free(out);
+}
+
 void assert_transport_data (struct transport_test *tp, const char *fmt, ...)
 {
     char buf[TRANSPORT_TEST_FMT_MAX];
@@ -49,20 +67,8 @@
         FATAL("input too long: %zu bytes", len);
 
     va_end(vargs);
-
-    // get the data out
-    char *out;
-    
-    transport_test_pull_buf(tp, &out, &len);
     
-    log_debug("pull_buf: %s", dump_strn(out, len));
-    
-    // should be the same
-    assert_strncmp(out, buf, len);
-    assert_strlen(buf, len);
-
-    // cleanup
-    free(out);
+    assert_transport_data_buf(tp, buf, len);
 }
 
 struct transport_test* setup_transport_test (void)
--- a/src/test/transport.h	Fri May 08 02:51:20 2009 +0300
+++ b/src/test/transport.h	Sun May 10 23:37:43 2009 +0300
@@ -28,10 +28,15 @@
 void assert_transport_eof (transport_t *transport);
 
 /**
- * Compare the written data stored in the given transport_test with the string obtained using the given format and args.
+ * Compare the written data stored in the given transport_test with the given buffer.
  *
  * This will pull /all/ of the data in the transport.
  */
+void assert_transport_data_buf (struct transport_test *tp, const char *data, size_t len);
+
+/**
+ * Use assert_transport_data_buf() with the given formatted string.
+ */
 void assert_transport_data (struct transport_test *tp, const char *fmt, ...);
 
 /**
--- a/src/transport.h	Fri May 08 02:51:20 2009 +0300
+++ b/src/transport.h	Sun May 10 23:37:43 2009 +0300
@@ -46,6 +46,9 @@
  * enable the write event, and any pending write event is cancelled. If masked back in using transport_events(), the
  * write event will *not* be registered, so if you have pending data, do a transport_write() after enabling
  * TRANSPORT_WRITE.
+ *
+ * Note that transport_write() returning fewer bytes than given will *not* enable the write event! You must call
+ * transport_write() until you have either written all of your data, or it returns zero!
  */
 struct transport;
 
@@ -129,7 +132,8 @@
 /**
  * Write a series of bytes from the given \a buf (containing \a len bytes) to the transport. If succesfull, this
  * returns the number of bytes written (which may be less than \a len). If the transport is nonblocking, and the
- * operation would have blocked, no data will be written, and zero is returned.
+ * operation would have blocked, no data will be written, and zero is returned; in this case, the transport's write
+ * event is enabled (unless TRANSPORT_WRITE is masked out).
  *
  * On errors, this returns the negative error code, along with extended info via \a err.
  *
--- a/src/transport_test.c	Fri May 08 02:51:20 2009 +0300
+++ b/src/transport_test.c	Sun May 10 23:37:43 2009 +0300
@@ -352,6 +352,11 @@
     buf->write_vec = buf->vecs;
 }
 
+void transport_test_async_error (struct transport_test *tp, const error_t *err)
+{
+    transport_error(&tp->base, err);
+}
+
 void transport_test_destroy (struct transport_test *tp)
 {
     // free the buffers
--- a/src/transport_test.h	Fri May 08 02:51:20 2009 +0300
+++ b/src/transport_test.h	Sun May 10 23:37:43 2009 +0300
@@ -67,6 +67,11 @@
 void transport_test_pull_buf (struct transport_test *tp, char **buf_ptr, size_t *len_ptr);
 
 /**
+ * Send async error
+ */
+void transport_test_async_error (struct transport_test *tp, const error_t *err);
+
+/**
  * Destroy the transport buffer, releasing any buffers we allocated ourself
  */
 void transport_test_destroy (struct transport_test *tp);