--- a/src/CMakeLists.txt Tue Apr 28 20:27:45 2009 +0300
+++ b/src/CMakeLists.txt Tue Apr 28 22:08:59 2009 +0300
@@ -11,7 +11,7 @@
# define our source code modules
set (CORE_SOURCES error.c log.c str.c)
-set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c)
+set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c line_proto.c)
set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c irc_net_connect.c)
set (LUA_SOURCES nexus_lua.c lua_objs.c lua_config.c lua_irc.c lua_func.c lua_type.c)
set (CONSOLE_SOURCES console.c lua_console.c)
--- a/src/error.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/error.c Tue Apr 28 22:08:59 2009 +0300
@@ -90,10 +90,17 @@
{ ERR_LUA_ERR, "lua: error handling error", ERR_EXTRA_STR },
{ ERR_LUA_FILE, "lua: error loading file", ERR_EXTRA_STR },
{ _ERR_INVALID, NULL, 0 }
+
}, _pcre_error_desc[] = {
{ ERR_PCRE_COMPILE, "pcre_compile", ERR_EXTRA_STR },
{ ERR_PCRE_EXEC, "pcre_exec", ERR_EXTRA_STR },
{ _ERR_INVALID, NULL, 0 }
+}, _general_error_desc[] = {
+ { ERR_MISC, "miscellaneous error", ERR_EXTRA_STR },
+ { ERR_CMD_OPT, "invalid command line option", ERR_EXTRA_STR },
+ { ERR_DUP_NAME, "duplicate name", ERR_EXTRA_STR },
+ { ERR_EOF, "EOF", ERR_EXTRA_NONE },
+ { _ERR_INVALID, NULL, 0 }
};
/**
--- a/src/error.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/error.h Tue Apr 28 22:08:59 2009 +0300
@@ -72,6 +72,7 @@
_ERR_LIBEVENT = 0x000500,
ERR_EVENT_NEW,
ERR_EVENT_ADD,
+ ERR_EVENT_DEL,
/** Evsql errors */
_ERR_EVSQL = 0x000600,
@@ -131,10 +132,11 @@
/** General errors */
_ERR_GENERAL = 0xffff00,
- ERR_CMD_OPT,
+ ERR_MISC, ///< general error
+ ERR_CMD_OPT, ///< invalid commandline option
ERR_UNKNOWN,
- ERR_DUP_NAME,
- ERR_EOF,
+ ERR_DUP_NAME, ///< duplicate name
+ ERR_EOF, ///< end of file
};
--- a/src/irc_client.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_client.c Tue Apr 28 22:08:59 2009 +0300
@@ -58,8 +58,8 @@
};
// combine _net_info and defaults to get net_info
- if (_net_info->raw_sock)
- RETURN_SET_ERROR_STR(err, ERR_IRC_NET_INFO, "raw_sock");
+ if (_net_info->transport)
+ RETURN_SET_ERROR_STR(err, ERR_IRC_NET_INFO, "transport");
if ((net_info.network = _net_info->network) == NULL)
RETURN_SET_ERROR_STR(err, ERR_IRC_NET_INFO, "network");
--- a/src/irc_conn.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_conn.c Tue Apr 28 22:08:59 2009 +0300
@@ -292,7 +292,7 @@
{
// the line_proto
if (conn->lp)
- line_proto_release(conn->lp);
+ line_proto_destroy(conn->lp);
// the queue
if (conn->out_queue)
--- a/src/irc_net.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_net.h Tue Apr 28 22:08:59 2009 +0300
@@ -35,8 +35,8 @@
/** Protocol registration info (nickname etc) */
struct irc_conn_register_info register_info;
- /** Raw socket to use, mainly for testing purposes */
- struct sock_stream *raw_sock;
+ /** Alternatively, raw transport to use, mainly for testing purposes */
+ transport_t *transport;
};
/**
--- a/src/irc_net_connect.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/irc_net_connect.c Tue Apr 28 22:08:59 2009 +0300
@@ -1,5 +1,6 @@
#include "irc_net_internal.h"
+#include "sock_tcp.h"
#include "log.h"
#include <time.h>
@@ -25,18 +26,18 @@
}
/**
- * We have succesfully established a connection to our server with the given sock, so create the irc_conn and bind it
- * to us.
+ * We have succesfully established a connection to our server with the given transport, so create the irc_conn and bind
+ * it to us.
*
- * If this fails, this will clean up any partial state, including sock.
+ * If this fails, this will clean up any partial state, including \a transport.
*/
-static err_t irc_net_connected (struct irc_net *net, struct sock_stream *sock, struct error_info *err)
+static err_t irc_net_connected (struct irc_net *net, transport_t *transport, struct error_info *err)
{
// mark state
net->connecting = false;
// create the irc connection state
- if (irc_conn_create(&net->conn, sock, &irc_net_conn_callbacks, net, err))
+ if (irc_conn_create(&net->conn, transport, &irc_net_conn_callbacks, net, err))
goto error;
// add our command handlers
@@ -55,8 +56,8 @@
error:
if (!net->conn) {
- // cleanup sock ourselves
- sock_stream_release(sock);
+ // cleanup transport ourselves
+ transport_destroy(transport);
} else {
// cleanup the partial stuff
@@ -72,26 +73,36 @@
* Our sock_*_connect_async callback. If the connect ended up failing, then try and reconnect later. Otherwise, do
* irc_net_connected().
*/
-static void irc_net_connect_cb (struct sock_stream *sock, struct error_info *conn_err, void *arg)
+static void irc_net_on_connect (transport_t *transport, void *arg)
{
struct irc_net *net = arg;
- struct error_info err;
+ error_t err;
- if (conn_err) {
- // attempt reconnect later
- log_err_info(conn_err, "connect failed");
-
- if (irc_net_connect(net, false, &err))
- log_err_info(&err, "unable to reconnect");
+ // yay
+ if (irc_net_connected(net, transport, &err))
+ log_err_info(&err, "irc_net_connected");
+}
- } else {
- // yay
- if (irc_net_connected(net, sock, &err))
- log_err_info(&err, "irc_net_connected");
+static void irc_net_on_connect_error (transport_t *transport, const error_t *conn_err, void *arg)
+{
+ struct irc_net *net = arg;
+ error_t err;
- }
+ // clean up
+ transport_destroy(transport);
+
+ // attempt reconnect later
+ log_err_info(conn_err, "connect failed");
+
+ if (irc_net_connect(net, false, &err))
+ log_err_info(&err, "unable to reconnect");
}
+static const struct transport_callbacks irc_net_transport_callbacks = {
+ .on_connect = irc_net_on_connect,
+ .on_error = irc_net_on_connect_error,
+};
+
/**
* The low-level connect() implementation, connects based on irc_net::info, calling irc_net_connected/irc_net_reconnect
* later if this succeeds.
@@ -99,20 +110,24 @@
static err_t irc_net_do_connect (struct irc_net *net, struct error_info *err)
{
struct irc_net_info *info = &net->info;
- struct sock_stream *sock = NULL;
+ struct transport_info transport_info = { &irc_net_transport_callbacks, net, TRANSPORT_READ | TRANSPORT_WRITE };
+ transport_t *transport = NULL;
// sanity check
assert(!net->connecting && !net->connected);
// connect based on what's known
- if (info->raw_sock) {
- log_debug("connected using raw socket: %p", info->raw_sock);
+ if (info->transport) {
+ log_debug("connected using raw transport: %p", info->transport);
- // direct sock_stream connection
- sock = info->raw_sock;
+ // direct transport connection
+ transport = info->transport;
- // then create the conn right away
- if (irc_net_connected(net, sock, err))
+ // invalidate it from info since it will get destroyed later
+ info->transport = NULL;
+
+ // then create the transport right away
+ if (irc_net_connected(net, transport, err))
goto error;
} else if (info->ssl_cred) {
@@ -123,20 +138,23 @@
log_debug("connecting to [%s]:%s using SSL", info->hostname, info->service);
// connect
- if (sock_ssl_connect_async(&sock, info->hostname, info->service, info->ssl_cred, &irc_net_connect_cb, net, err))
+ if (sock_ssl_connect(&transport_info, &transport, info->hostname, info->service, info->ssl_cred, err))
goto error;
net->connecting = true;
- } else {
+ } else if (info->hostname || info->service) {
log_debug("connecting to [%s]:%s", info->hostname, info->service);
// begin async connect
- if (sock_tcp_connect_async(&sock, info->hostname, info->service, &irc_net_connect_cb, net, err))
+ if (sock_tcp_connect(&transport_info, &transport, info->hostname, info->service, err))
goto error;
net->connecting = true;
+ } else {
+ RETURN_SET_ERROR_STR(err, ERR_MISC, "no connection info specified");
+
}
return SUCCESS;
@@ -161,9 +179,6 @@
log_err_info(&err, "unable to reconnect");
}
-// XXX: to get the ev_base
-#include "sock_internal.h"
-
/**
* Schedule a reconnection attempt in IRC_NET_RECONNECT_INTERVAL.
*/
@@ -204,6 +219,10 @@
return ERROR_CODE(err);
}
+// XXX: to get the ev_base
+#include "sock_internal.h"
+
+
err_t irc_net_connect_init (struct irc_net *net, struct error_info *err)
{
// look up the ev_base
--- a/src/line_proto.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/line_proto.c Tue Apr 28 22:08:59 2009 +0300
@@ -10,8 +10,8 @@
* Our state
*/
struct line_proto {
- /* The sock_stream we read/write with */
- struct sock_stream *sock;
+ /* The transport we read/write with */
+ transport_t *transport;
/* The incoming/outgoing line buffer */
char *in_buf, *out_buf;
@@ -36,9 +36,6 @@
void *cb_arg;
};
-// function prototypes
-static err_t line_proto_schedule_events (struct line_proto *lp, short what);
-
/**
* An error occured which we could not recover from; the line_proto should now be considered corrupt.
*
@@ -54,74 +51,54 @@
}
/**
- * Our sock_stream on_read handler
+ * Our transport_callbacks::on_read handler
*/
-static void line_proto_on_read (struct sock_stream *sock, void *arg)
+static void line_proto_on_read (transport_t *transport, void *arg)
{
struct line_proto *lp = arg;
char *line;
- (void) sock;
+ (void) transport;
// sanity-check
assert(lp->tail_offset < lp->buf_len);
do {
// attempt to read a line
- if (line_proto_recv(lp, &line)) {
+ if (line_proto_recv(lp, &line))
// faaail
return line_proto_set_error(lp);
- }
// got a line?
if (line)
lp->callbacks.on_line(line, lp->cb_arg);
} while (line);
-
- // reschedule
- if (line_proto_schedule_events(lp, EV_READ))
- line_proto_set_error(lp);
}
/*
* Signal for write
*/
-static void line_proto_on_write (struct sock_stream *sock, void *arg)
+static void line_proto_on_write (transport_t *transport, void *arg)
{
struct line_proto *lp = arg;
int ret;
- (void) sock;
+ (void) transport;
// just flush
- if ((ret = line_proto_flush(lp)) < 0) {
- // faaaail
- SET_ERROR(&lp->err, -ret);
-
+ if ((ret = line_proto_flush(lp)) < 0)
+ // faaail
return line_proto_set_error(lp);
- }
}
-/*
- * Schedule our sock_stream callback
- */
-static err_t line_proto_schedule_events (struct line_proto *lp, short what)
-{
- // just use sock_stream's interface
- if (sock_stream_event_enable(lp->sock, what))
- RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
- return SUCCESS;
-}
-
-struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
+static const struct transport_callbacks line_proto_transport_callbacks = {
.on_read = &line_proto_on_read,
.on_write = &line_proto_on_write,
};
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
- const struct line_proto_callbacks *callbacks, void *cb_arg, struct error_info *err)
+err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
+ const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err)
{
struct line_proto *lp;
@@ -134,17 +111,16 @@
JUMP_SET_ERROR(err, ERR_CALLOC);
// store
- lp->sock = sock;
+ lp->transport = transport;
lp->buf_len = buf_size;
lp->callbacks = *callbacks;
lp->cb_arg = cb_arg;
- // initialize event-based stuff
- if (
- sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp)
- || line_proto_schedule_events(lp, EV_READ)
- )
- JUMP_SET_ERROR_INFO(err, &lp->err);
+ // setup the transport
+ transport_set_callbacks(transport, &line_proto_transport_callbacks, lp);
+
+ if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE)))
+ goto error;
// return
*lp_ptr = lp;
@@ -154,7 +130,7 @@
error:
// cleanup the lp
if (lp)
- line_proto_release(lp);
+ line_proto_destroy(lp);
return ERROR_CODE(err);
}
@@ -237,9 +213,8 @@
return ERR_LINE_TOO_LONG;
// otherwise, read more data
- if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0)
- // store and return NULL on errors
- RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+ if ((ret = transport_read(lp->transport, lp->in_buf + recv_offset, lp->buf_len - recv_offset, &lp->err)) < 0)
+ return ERROR_CODE(&lp->err);
// EAGAIN?
if (ret == 0) {
@@ -271,11 +246,8 @@
return -ERR_LINE_TOO_LONG;
// try and write the line
- if ((ret = sock_stream_write(lp->sock, line, len)) < 0) {
- SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
+ if ((ret = transport_write(lp->transport, line, len, &lp->err)) < 0)
return -ERROR_CODE(&lp->err);
- }
// length of the sent data
ret_len = ret;
@@ -294,11 +266,7 @@
// update offset
lp->out_offset = trailing;
- // register for EV_WRITE
- if (line_proto_schedule_events(lp, EV_READ | EV_WRITE))
- return -ERROR_CODE(&lp->err);
-
- // buffered...
+ // buffered... transport should invoke on_write itself
return 1;
} else {
@@ -314,11 +282,8 @@
size_t ret_len;
// try and write the line
- if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) {
- SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
-
+ if ((ret = transport_write(lp->transport, lp->out_buf, lp->out_offset, &lp->err)) < 0)
return -ERROR_CODE(&lp->err);
- }
ret_len = ret;
@@ -340,10 +305,6 @@
lp->out_offset = remaining;
}
- // register for EV_WRITE
- if (line_proto_schedule_events(lp, EV_READ | EV_WRITE))
- return -ERROR_CODE(&lp->err);
-
// ok
return 1;
}
@@ -354,15 +315,15 @@
return &lp->err;
}
-void line_proto_release (struct line_proto *lp)
+void line_proto_destroy (struct line_proto *lp)
{
// free buffers
free(lp->in_buf);
free(lp->out_buf);
// socket?
- if (lp->sock)
- sock_stream_release(lp->sock);
+ if (lp->transport)
+ transport_destroy(lp->transport);
// free the state itself
free(lp);
--- a/src/line_proto.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/line_proto.h Tue Apr 28 22:08:59 2009 +0300
@@ -38,10 +38,10 @@
* @param err error information is returned via this pointer
*/
err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
- const struct line_proto_callbacks *callbacks, void *cb_arg, struct error_info *err);
+ const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err);
/**
- * Runs the socket recv() into our internal buffer. If a full line was received, a pointer to our internal bufffer is
+ * Runs transport_read() with our internal buffer. If a full line was received, a pointer to our internal bufffer is
* returned via *line_ptr, and we return SUCCESS. If we don't yet have a full line, and receiving more would block,
* NULL is returned via *line_ptr instead. Otherwise, nonzero error return code.
*
@@ -71,10 +71,10 @@
const struct error_info* line_proto_error (struct line_proto *lp);
/**
- * Release any allocated buffers, and the underlying sock_stream.
+ * Destroy any buffers and the underlying transport.
*
* This does not close the connection cleanly, and is intended for use to abort after errors.
*/
-void line_proto_release (struct line_proto *lp);
+void line_proto_destroy (struct line_proto *lp);
#endif /* LINE_PROTO_H */
--- a/src/log.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/log.c Tue Apr 28 22:08:59 2009 +0300
@@ -124,7 +124,7 @@
va_end(vargs);
}
-void _log_err (enum log_level level, struct error_info *err, const char *func, const char *format, ...)
+void _log_err (enum log_level level, const error_t *err, const char *func, const char *format, ...)
{
va_list vargs;
--- a/src/log.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/log.h Tue Apr 28 22:08:59 2009 +0300
@@ -76,7 +76,7 @@
/**
* Log a message with the given level, appending the formatted error message
*/
-void _log_err (enum log_level level, struct error_info *err, const char *func, const char *format, ...)
+void _log_err (enum log_level level, const error_t *err, const char *func, const char *format, ...)
__attribute__ ((format (printf, 4, 5)));
/**
--- a/src/sock_gnutls.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/sock_gnutls.c Tue Apr 28 22:08:59 2009 +0300
@@ -25,12 +25,12 @@
switch ((ret = gnutls_record_get_direction(sock->session))) {
case 0:
// read more data
- mask = EV_READ;
+ mask = TRANSPORT_READ;
break;
case 1:
// write buffer full
- mask = EV_WRITE;
+ mask = TRANSPORT_WRITE;
break;
default:
@@ -39,7 +39,7 @@
}
// do the enabling
- if ((ERROR_CODE(err) = transport_fd_enable(SOCK_GNUTLS_FD(sock), EV_READ)))
+ if ((ERROR_CODE(err) = transport_fd_enable(SOCK_GNUTLS_FD(sock), mask)))
return ERROR_CODE(err);
--- a/src/sock_tcp.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/sock_tcp.c Tue Apr 28 22:08:59 2009 +0300
@@ -16,7 +16,6 @@
-
/**
* Our transport_methods
*/
@@ -35,6 +34,7 @@
.methods = {
.read = transport_fd_methods_read,
.write = transport_fd_methods_write,
+ .events = transport_fd_methods_events,
.destroy = _sock_tcp_destroy,
},
};
@@ -138,16 +138,35 @@
}
/**
- * Our async connect operation has completed, clean up and call the user callback. The given \a err should be NULL for
- * successful completion, or the error for failures.
+ * Our async connect operation has completed, clean up, set up state for event-based operation with user callbacks, and
+ * invoke transport_connected().
+ *
+ * The given \a err should be NULL for successful completion, or the error for failures.
*/
static void sock_tcp_connect_done (struct sock_tcp *sock, struct error_info *err)
{
// cleanup
sock_tcp_connect_cleanup(sock);
+
+ if (err)
+ // passthrough errors
+ goto error;
- // ok, run callback
- transport_connected(SOCK_TCP_TRANSPORT(sock), err, false);
+ // install the transport_invoke callback handler
+ if ((ERROR_CODE(err) = transport_fd_setup(SOCK_TCP_FD(sock), transport_fd_callback_user, NULL)))
+ goto error;
+
+ // enable read unless masked out
+ if (SOCK_TCP_TRANSPORT(sock)->info.ev_mask & TRANSPORT_READ) {
+ if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), TRANSPORT_READ)))
+ goto error;
+ }
+
+ // ok, no error
+
+error:
+ // pass on to transport
+ transport_connected(SOCK_TCP_TRANSPORT(sock), IS_ERROR(err) ? err : NULL, false);
}
/**
@@ -212,7 +231,7 @@
goto error;
// enable for write
- if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), EV_WRITE)))
+ if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), TRANSPORT_WRITE)))
goto error;
} else {
--- a/src/transport.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport.c Tue Apr 28 22:08:59 2009 +0300
@@ -2,6 +2,9 @@
#include <assert.h>
+/*
+ * Internal API
+ */
void transport_init (transport_t *transport, const struct transport_type *type, const struct transport_info *info)
{
// not already bound
@@ -61,3 +64,71 @@
// invoke callback
transport->info.cb_tbl->on_error(transport, err, transport->info.cb_arg);
}
+
+/*
+ * Public API
+ */
+int transport_read (transport_t *transport, void *buf, size_t len, error_t *err)
+{
+ // not readable
+ if (!transport->type->methods.read)
+ return -1;
+
+ // proxy off to method handler
+ if (transport->type->methods.read(transport, buf, &len, err))
+ return -ERROR_CODE(err);
+
+ // return updated bytes-read len
+ return len;
+}
+
+int transport_write (transport_t *transport, const void *buf, size_t len, error_t *err)
+{
+ // XXX: not writeable
+ if (!transport->type->methods.write)
+ return -1;
+
+ // proxy off to method handler
+ if (transport->type->methods.write(transport, buf, &len, err))
+ return -ERROR_CODE(err);
+
+ // return updated bytes-written len
+ return len;
+}
+
+err_t transport_events (transport_t *transport, short mask)
+{
+ error_t err;
+
+ // notify transport
+ if (transport->type->methods.events) {
+ if (transport->type->methods.events(transport, mask, &err))
+ goto error;
+ }
+
+ // update the event mask
+ transport->info.ev_mask = mask;
+
+ // ok
+ return SUCCESS;
+
+error:
+ return ERROR_CODE(&err);
+}
+
+void transport_set_callbacks (transport_t *transport, const struct transport_callbacks *cb_tbl, void *cb_arg)
+{
+ transport->info.cb_tbl = cb_tbl;
+ transport->info.cb_arg = cb_arg;
+}
+
+void transport_destroy (transport_t *transport)
+{
+ // destroy the transport-specific stuff
+ if (transport->type->methods.destroy)
+ transport->type->methods.destroy(transport);
+
+ // then the transport itself
+ free(transport);
+}
+
--- a/src/transport.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport.h Tue Apr 28 22:08:59 2009 +0300
@@ -10,17 +10,54 @@
#include "error.h"
/**
- * Opaque transport state
+ * Opaque transport state handle.
+ *
+ * Transports are reliable byte streams, connected with some endpoint over some medium. Common implementations are
+ * e.g. TCP, SSL or fifo transports (using the OS file/socket API).
+ *
+ * Transports can be connected or unconnected. For synchronous opens (e.g. fifo_open_read()), the transport returned
+ * will already be connected, meaning that the transport_callbacks::on_connect callback is unused. For async connects
+ * such as sock_tcp_connect()/sock_ssl_connect(), the transport returned is *not* connected, and you must wait for
+ * transport_callbacks::on_connect to be called before being able to send/recieve data on the transport.
+ *
+ * Once you have an opened transport, sending and receiving data is simple - just call transport_read()/transport_write().
+ * These implement unbuffered I/O, so they may do partial reads/writes. In terms of the system read/write calls, the
+ * main difference is in the error return codes. On EOF, instead of returning zero, they return ERR_EOF (or
+ * ERR_WRITE_EOF for transport_write, for whoever knows what that means...). This means that when the underlying
+ * transport is unable to fufill the request due to lack of data/buffer space, these can return zero to signifiy s
+ * something simliar to EAGAIN.
+ *
+ * The transport API also implements non-blocking/event-based operation (usually on top of libevent), although at a
+ * slightly different level than the normal select/poll API. Instead of the user asking the transport to notify for
+ * read/write after transport_read/transport_write return zero, the transport will take care of this itself.
+ *
+ * Specifically, the user can supply a mask of events they are currently interested in. By default, this should be the
+ * full TRANSPORT_READ | TRANSPORT_WRITE, as the transport will take care of managing events by itself. If you wish to
+ * e.g. throttle read/write, you may set a different event mask using transport_events(), which will prevent the
+ * relevant callback from being triggered.
+ *
+ * For reads, the transport maintains a persistent read event, and will always call on_read when data is available on
+ * the socket (i.e. normal select() semantics). If masked out using transport_events(), there should be no event
+ * activity on the transport (i.e. the fd read event is removed).
+ *
+ * For writes, the transport maintains a write event that is disabled by default. If on_write returns zero, it will
+ * become enabled *once*, and consequently trigger transport_callbacks::on_write *once*, after which you must call
+ * transport_write() to possibly enable it again. If masked out using transport_events(), transport_write() will not
+ * enable the write event, and any pending write event is cancelled. If masked back in using transport_events(), the
+ * write event will *not* be registered, so if you have pending data, do a transport_write() after enabling
+ * TRANSPORT_WRITE.
*/
struct transport;
/**
- * Because it's so annoying seeing "struct" everywhere
+ * @see transport
*/
typedef struct transport transport_t;
/**
- * Callbacks for structs
+ * User callbacks for transports
+ *
+ * @see transport
*/
struct transport_callbacks {
/**
@@ -41,12 +78,16 @@
/**
* An asynchronous error has occured. This is only called for errors that occur while being called directly from
* the underlying event loop, and never from inside an API function.
+ *
+ * You must call transport_destroy to release the transport.
*/
void (*on_error) (transport_t *transport, const error_t *err, void *arg);
};
/**
* Bitmask of available events
+ *
+ * @see transport
*/
enum transport_event {
TRANSPORT_READ = 0x01,
@@ -55,6 +96,8 @@
/**
* User info required to build a transport
+ *
+ * @see transport
*/
struct transport_info {
/** The callbacks table */
@@ -101,9 +144,14 @@
int transport_write (transport_t *transport, const void *buf, size_t len, error_t *err);
/**
- * Change the mask of enabled events
+ * Change the mask of enabled events.
*/
-void transport_events (transport_t *transport, short mask);
+err_t transport_events (transport_t *transport, short mask);
+
+/**
+ * Install a new set of callback handlers, replacing the old ones.
+ */
+void transport_set_callbacks (transport_t *transport, const struct transport_callbacks *cb_tbl, void *cb_arg);
/**
* Close and destroy the transport immediately, severing any established connection rudely.
--- a/src/transport_fd.c Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport_fd.c Tue Apr 28 22:08:59 2009 +0300
@@ -74,6 +74,11 @@
// EAGAIN -> zero bytes
*len = 0;
+ if (transport->info.ev_mask & TRANSPORT_WRITE)
+ // enable the write event
+ if ((ERROR_CODE(err) = transport_fd_enable(fd, TRANSPORT_WRITE)))
+ return ERROR_CODE(err);
+
} else {
// normal -> bytes read
*len = ret;
@@ -82,9 +87,26 @@
return SUCCESS;
}
+err_t transport_fd_methods_events (transport_t *transport, short mask, error_t *err)
+{
+ struct transport_fd *fd = transport_check(transport, &transport_fd_type);
+
+ short _mask = 0;
+
+ // enable read as requested
+ if (mask & TRANSPORT_READ)
+ _mask |= TRANSPORT_READ;
+
+ // enable write if requested and it's currently enabled
+ if ((mask & TRANSPORT_WRITE) && event_pending(fd->ev_write, EV_WRITE, NULL))
+ _mask |= TRANSPORT_WRITE;
+
+ // set
+ return (ERROR_CODE(err) = transport_fd_events(fd, mask));
+}
+
void _transport_fd_destroy (transport_t *transport)
{
-
struct transport_fd *fd = transport_check(transport, &transport_fd_type);
transport_fd_destroy(fd);
@@ -93,10 +115,22 @@
const struct transport_methods transport_fd_methods = {
.read = transport_fd_methods_read,
.write = transport_fd_methods_write,
+ .events = transport_fd_methods_events,
.destroy = _transport_fd_destroy
};
/**
+ * Dummy callbacks
+ */
+void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg)
+{
+ (void) arg;
+
+ // proxy
+ transport_invoke(TRANSPORT_FD_BASE(fd), what);
+}
+
+/**
* Function implementations
*/
void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd)
@@ -139,7 +173,7 @@
assert(!fd->ev_read && !fd->ev_write);
// create new events
- if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ, transport_fd_on_event, fd)) == NULL)
+ if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ | EV_PERSIST, transport_fd_on_event, fd)) == NULL)
goto err_event_add;
if ((fd->ev_write = event_new(fd->ev_base, fd->fd, EV_WRITE, transport_fd_on_event, fd)) == NULL)
@@ -166,22 +200,59 @@
// install the event handlers?
if (!fd->ev_read || !fd->ev_write)
- transport_fd_install(fd);
+ return transport_fd_install(fd);
+ else
+ return SUCCESS;
+}
+
+err_t transport_fd_enable (struct transport_fd *fd, short mask)
+{
+ // just add the appropriate events
+ if (mask & TRANSPORT_READ && event_add(fd->ev_read, NULL))
+ return ERR_EVENT_ADD;
+
+ if (mask & TRANSPORT_WRITE && event_add(fd->ev_write, NULL))
+ return ERR_EVENT_ADD;
+
+
+ return SUCCESS;
+}
+
+err_t transport_fd_disable (struct transport_fd *fd, short mask)
+{
+ if (mask & TRANSPORT_READ && event_del(fd->ev_read))
+ return ERR_EVENT_DEL;
+
+ if (mask & TRANSPORT_WRITE && event_del(fd->ev_write))
+ return ERR_EVENT_DEL;
return SUCCESS;
}
-err_t transport_fd_enable (struct transport_fd *fd, short mask)
+err_t transport_fd_events (struct transport_fd *fd, short mask)
{
- // just add the appropraite events
- if (mask & EV_READ && event_add(fd->ev_read, NULL))
- return ERR_EVENT_ADD;
-
- if (mask & EV_WRITE && event_add(fd->ev_write, NULL))
- return ERR_EVENT_ADD;
+ err_t err;
+ // enable/disable read
+ if (mask & TRANSPORT_READ)
+ err = event_add(fd->ev_read, NULL);
+ else
+ err = event_del(fd->ev_read);
+ if (err)
+ return err;
+
+ // enable/disable write
+ if (mask & TRANSPORT_WRITE)
+ err = event_add(fd->ev_write, NULL);
+ else
+ err = event_del(fd->ev_write);
+
+ if (err)
+ return err;
+
+ // ok
return SUCCESS;
}
@@ -228,16 +299,8 @@
void transport_fd_invoke (struct transport_fd *fd, short what)
{
- short _what = 0;
-
- if (what & EV_READ)
- _what |= TRANSPORT_READ;
-
- if (what & EV_WRITE)
- _what |= TRANSPORT_WRITE;
-
// invoke
- transport_invoke(TRANSPORT_FD_BASE(fd), _what);
+ transport_invoke(TRANSPORT_FD_BASE(fd), what);
}
err_t transport_fd_close (struct transport_fd *fd)
--- a/src/transport_fd.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport_fd.h Tue Apr 28 22:08:59 2009 +0300
@@ -19,7 +19,7 @@
/**
* Low-level callback
*/
-typedef void (*transport_fd_callback_func) (struct transport_fd *tp_fd, short what, void *arg);
+typedef void (*transport_fd_callback_func) (struct transport_fd *fd, short what, void *arg);
/**
* The fd-based transport implementation
@@ -61,9 +61,21 @@
err_t transport_fd_methods_read (transport_t *transport, void *buf, size_t *len, error_t *err);
/**
- * Implementation of transport_methods::write
+ * Implementation of transport_methods::write.
+ *
+ * If this gets EAGAIN, it will automatically enable the write event, unless masked out.
*/
-err_t transport_fd_methods_write (transport_t *transport, const void *buf, size_t *len, struct error_info *err);
+err_t transport_fd_methods_write (transport_t *transport, const void *buf, size_t *len, error_t *err);
+
+/**
+ * Implementation of transport_methods::events.
+ *
+ * For TRANSPORT_READ, this will simply apply enable/disable as given.
+ *
+ * For TRANSPORT_WRITE, the write event will only be enabled if given in the mask, *and* the ev_write event is currently
+ * active (via transport_fd_methods_write()); otherwise, the write event will not be enabled.
+ */
+err_t transport_fd_methods_events (transport_t *transport, short mask, error_t *err);
/**
* The transport_methods struct
@@ -71,6 +83,16 @@
extern const struct transport_methods transport_fd_methods;
/**
+ * A transport_fd_callback_func that simply invokes the transport_callback user functions.
+ *
+ * Register with a NULL cb_arg.
+ */
+void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg);
+
+
+
+
+/**
* Initialize the transport_fd to use the given, connected fd, or TRANSPORT_FD_INVALID if we don't yet have an fd.
*
* It is an error to call this if the transport_fd already has an fd set
@@ -94,11 +116,21 @@
err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg);
/**
- * Enable the callbacks for the specified events, any of { EV_WRITE, EV_READ }.
+ * Enable the specified events, any of { EV_WRITE, EV_READ }.
*/
err_t transport_fd_enable (struct transport_fd *fd, short mask);
/**
+ * Disable the specifid events, any of { EV_WRITE, EV_READ }.
+ */
+err_t transport_fd_disable (struct transport_fd *fd, short mask);
+
+/**
+ * Set the enable/disable state of our events to the given mask of { EV_WRITE, EV_READ }.
+ */
+err_t transport_fd_events (struct transport_fd *fd, short mask);
+
+/**
* Remove any old event callback present, so it will not be called anymore.
*
* It is perfectly safe to call this without any callbacks installed.
--- a/src/transport_internal.h Tue Apr 28 20:27:45 2009 +0300
+++ b/src/transport_internal.h Tue Apr 28 22:08:59 2009 +0300
@@ -11,7 +11,9 @@
#include <stdbool.h>
/**
- * Method table for implementation stuff
+ * Method table for implementation stuff.
+ *
+ * Note that it is the transport's resposibility to implement the behaviour described in transport.h
*/
struct transport_methods {
/** For transport_read() */
@@ -20,6 +22,13 @@
/** For transport_write() */
err_t (*write) (transport_t *transport, const void *buf, size_t *len, error_t *err);
+ /**
+ * The mask of event flags will be set to the given mask if this method is succesfully.
+ *
+ * The old mask is still available in transport::info::ev_mask.
+ */
+ err_t (*events) (transport_t *transport, short mask, error_t *err);
+
/**
* Release the transport's internal state, but not the transport itself.
*
@@ -57,7 +66,7 @@
/** User info */
struct transport_info info;
-
+
/** Are we connected? */
bool connected;
};