fixed fifo new-transport
authorTero Marttila <terom@fixme.fi>
Tue, 28 Apr 2009 22:36:36 +0300
branchnew-transport
changeset 157 1e5674d0eec4
parent 156 6534a4ac957b
child 158 b5a5df4f4421
fixed fifo
src/CMakeLists.txt
src/fifo.c
src/fifo.h
src/line_proto.c
src/line_proto.h
src/modules/logwatch.h
src/modules/logwatch_source.c
src/sock_fifo.c
src/sock_tcp.c
src/transport.c
src/transport_fd.c
src/transport_fd.h
src/transport_internal.h
--- a/src/CMakeLists.txt	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/CMakeLists.txt	Tue Apr 28 22:36:36 2009 +0300
@@ -11,7 +11,7 @@
 
 # define our source code modules
 set (CORE_SOURCES error.c log.c str.c)
-set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c line_proto.c)
+set (IO_SOURCES transport.c transport_fd.c sock.c sock_tcp.c sock_gnutls.c fifo.c line_proto.c)
 set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c irc_net_connect.c)
 set (LUA_SOURCES nexus_lua.c lua_objs.c lua_config.c lua_irc.c lua_func.c lua_type.c)
 set (CONSOLE_SOURCES console.c lua_console.c)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/fifo.c	Tue Apr 28 22:36:36 2009 +0300
@@ -0,0 +1,153 @@
+
+#include "fifo.h"
+#include "transport_fd.h"
+
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <string.h>
+
+/**
+ * Our transport_type
+ */
+extern const struct transport_type fifo_type;
+
+/**
+ * The fifo state
+ */
+struct fifo {
+    /** The FD-based state */
+    struct transport_fd base_fd;
+    
+    /** Path to the fifo */
+    char *path;
+};
+
+/**
+ * Get a sock_fd pointer from a sock_fifo pointer
+ */
+#define FIFO_FD(sock_ptr) (&(sock_ptr)->base_fd)
+
+/**
+ * Get a sock_base pointer from a sock_fifo pointer
+ */
+#define FIFO_TRANSPORT(sock_ptr) TRANSPORT_FD_BASE(FIFO_FD(sock_ptr))
+
+
+/**
+ * (re)open the fifo, closing it if already open, and keeping any event callbacks registered.
+ */
+static err_t fifo_open (struct fifo *fifo, error_t *err)
+{
+    int _fd;
+
+    // open(2) the path in non-blocking read-only mode
+    if ((_fd = open(fifo->path, O_RDONLY | O_NONBLOCK)) < 0)
+        RETURN_SET_ERROR_ERRNO(err, ERR_OPEN);
+
+    // set the new fd
+    if ((ERROR_CODE(err) = transport_fd_set(FIFO_FD(fifo), _fd)))
+        return ERROR_CODE(err);
+    
+    // use default transport event-based behaviour
+    if ((ERROR_CODE(err) = transport_fd_defaults(FIFO_FD(fifo))))
+        return ERROR_CODE(err);
+
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Destroy the fifo, releasing all resources
+ */
+static void fifo_destroy (struct fifo *fifo)
+{
+    // destroy base
+    transport_fd_destroy(FIFO_FD(fifo));
+
+    // release the path
+    free(fifo->path);
+}
+
+/**
+ * sock_stream_methods::read implementation.
+ *
+ * Try and do a normal sock_fd_read call, but re-open with EAGAIN on EOF
+ */
+err_t fifo_read (transport_t *transport, void *buf, size_t *len, struct error_info *err)
+{
+    struct fifo *fifo = transport_check(transport, &fifo_type);
+
+    // trap READ_EOF
+    if (transport_fd_methods_read(transport, buf, len, err) != ERR_EOF)
+        return ERROR_CODE(err);
+    
+    // re-open it
+    // XXX: re-add events?
+    if (fifo_open(fifo, err))
+        goto error;
+
+    // ok, act as if it was EAGAIN
+    *len = 0;
+
+    return SUCCESS;
+
+error:
+    return ERROR_CODE(err);
+}
+
+/**
+ * sock_stream_methods::release implementation
+ */
+static void _fifo_destroy (transport_t *transport)
+{
+    struct fifo *fifo = transport_check(transport, &fifo_type);
+    
+    fifo_destroy(fifo);
+}
+
+/*
+ * Our sock_stream_type
+ */
+const struct transport_type fifo_type = {
+    .methods                = {
+        .read               = fifo_read,
+        .write              = NULL,
+        .events             = transport_fd_methods_events,
+        .destroy            = _fifo_destroy,
+    },
+};
+
+err_t fifo_open_read (struct transport_info *transport_info, transport_t **transport_ptr, struct event_base *ev_base,
+        const char *path, error_t *err)
+{
+    struct fifo *fifo;
+
+    // alloc
+    if ((fifo = calloc(1, sizeof(*fifo))) == NULL)
+        return SET_ERROR(err, ERR_CALLOC);
+
+    // copy the path
+    if ((fifo->path = strdup(path)) == NULL)
+        return SET_ERROR(err, ERR_STRDUP);
+
+    // init
+    transport_init(FIFO_TRANSPORT(fifo), &fifo_type, transport_info);
+    transport_fd_init(FIFO_FD(fifo), ev_base, TRANSPORT_FD_INVALID);
+    
+    // open the fifo
+    if (fifo_open(fifo, err))
+        goto error;
+
+    // ok
+    *transport_ptr = FIFO_TRANSPORT(fifo);
+
+    return SUCCESS;
+
+error:
+    // cleanup
+    fifo_destroy(fifo);
+
+    return ERROR_CODE(err);
+}
--- a/src/fifo.h	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/fifo.h	Tue Apr 28 22:36:36 2009 +0300
@@ -3,16 +3,20 @@
 
 #include "transport.h"
 
+#include <event2/event.h>
+
 /**
- * A read-only "socket" based on a FIFO, this provides nonblocking read operations by re-opening the FIFO on EOF.
+ * A read-only transport based on a FIFO, this provides nonblocking read operations by re-opening the FIFO on EOF.
  *
  * The transport will be ready for use right away, transport_callbacks::on_connect will never be called.
  *
+ * @param transport_info the setup info required to create the transport
  * @param transport_ptr returned transport
  * @param path the path to the filesystem fifo object
  * @param err returned error info
  */
-err_t fifo_open_read (transport_t **transport_ptr, const char *path, error_t *err);
+err_t fifo_open_read (struct transport_info *transport_info, transport_t **transport_ptr, struct event_base *ev_base,
+        const char *path, error_t *err);
 
 
 #endif
--- a/src/line_proto.c	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/line_proto.c	Tue Apr 28 22:36:36 2009 +0300
@@ -102,6 +102,12 @@
 {
     struct line_proto *lp;
 
+    // store
+    lp->transport = transport;
+    lp->buf_len = buf_size;
+    lp->callbacks = *callbacks;
+    lp->cb_arg = cb_arg;
+
     // allocate struct and buffers
     if (
             (lp = calloc(1, sizeof(*lp))) == NULL
@@ -110,12 +116,6 @@
     )
         JUMP_SET_ERROR(err, ERR_CALLOC);
 
-    // store
-    lp->transport = transport;
-    lp->buf_len = buf_size;
-    lp->callbacks = *callbacks;
-    lp->cb_arg = cb_arg;
-
     // setup the transport
     transport_set_callbacks(transport, &line_proto_transport_callbacks, lp);
     
--- a/src/line_proto.h	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/line_proto.h	Tue Apr 28 22:36:36 2009 +0300
@@ -30,6 +30,8 @@
  *
  * The incoming lines are buffered in a buffer of \a buf_size bytes. This imposes a maximum limit on the line length.
  *
+ * In case of errors, \a transport will be destroyed in any case.
+ *
  * @param lp_ptr a pointer to the new line_proto will be returned via this pointer
  * @param transport the connected transport to use
  * @param buf_size the incoming/outgoing buffer size, should be enough to hold the biggest possible line
--- a/src/modules/logwatch.h	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/modules/logwatch.h	Tue Apr 28 22:36:36 2009 +0300
@@ -5,8 +5,8 @@
  */
 #include <sys/queue.h>
 #include "../irc_chan.h"
-#include "../sock.h"
 #include "../line_proto.h"
+#include "../nexus.h"
 
 #include <event2/event.h>
 #include <pcre.h>
--- a/src/modules/logwatch_source.c	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/modules/logwatch_source.c	Tue Apr 28 22:36:36 2009 +0300
@@ -1,4 +1,5 @@
 #include "logwatch.h"
+#include "../fifo.h"
 #include "../log.h"
 
 #include <stdlib.h>
@@ -42,23 +43,25 @@
 }
 
 /**
- * Initialize with the given sock
+ * Initialize with the given transport.
+ *
+ * In case of errors, this will free the source and transport.
  */
-static err_t logwatch_source_init (struct logwatch_source *source, struct logwatch *ctx, const char *name, struct sock_stream *stream, struct error_info *err)
+static err_t logwatch_source_init (struct logwatch_source *source, struct logwatch *ctx, const char *name, transport_t *transport, error_t *err)
 {
     // duplicate name?
     if (logwatch_source_lookup(ctx, name))
-        return SET_ERROR(err, ERR_DUP_NAME);
+        JUMP_SET_ERROR(err, ERR_DUP_NAME);
 
     // store
     source->ctx = ctx;
     
     // the name
     if ((source->name = strdup(name)) == NULL)
-        return SET_ERROR(err, ERR_STRDUP);
+        JUMP_SET_ERROR(err, ERR_STRDUP);
     
     // create the lp to wrap the sock
-    if (line_proto_create(&source->lp, stream, LOGWATCH_SOURCE_LINE_MAX, &lp_callbacks, source, err))
+    if (line_proto_create(&source->lp, transport, LOGWATCH_SOURCE_LINE_MAX, &lp_callbacks, source, err))
         goto error;
 
     // add to logwatch_sources
@@ -67,36 +70,35 @@
     // ok
     return SUCCESS;
 
-error:    
+error:
+    free(source);
+
     return ERROR_CODE(err);
 }
 
 struct logwatch_source* logwatch_open_fifo (struct logwatch *ctx, const char *path, struct error_info *err)
 {
     struct logwatch_source *source;
-    struct sock_stream *stream = NULL;
+    transport_t *transport = NULL;
 
     // alloc
     if ((source = calloc(1, sizeof(*source))) == NULL)
         JUMP_SET_ERROR(err, ERR_CALLOC);
     
     // open
-    if (fifo_open_read(&stream, path, err))
+    if (fifo_open_read(NULL, &transport, ctx->nexus->ev_base, path, err))
         goto error;
         
     // init
-    if (logwatch_source_init(source, ctx, path, stream, err))
-        goto error;
+    if (logwatch_source_init(source, ctx, path, transport, err))
+        return NULL;
 
     // ok
     return source;
 
-error:    
-    // cleanup
-    if (stream)
-        sock_stream_release(stream);
-    
+error:
     if (source)
+        // cleanup
         free(source);
 
     return NULL;
@@ -106,7 +108,7 @@
 {
     // release the line_proto
     if (source->lp)
-        line_proto_release(source->lp);
+        line_proto_destroy(source->lp);
         
     // remove from the list
     TAILQ_REMOVE(&source->ctx->sources, source, logwatch_sources);
--- a/src/sock_fifo.c	Tue Apr 28 22:08:59 2009 +0300
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,153 +0,0 @@
-/**
- * @file
- *
- * A read-only sock_stream implementation for linux fifo(7).
- */
-#include "sock_fd.h"
-
-#include <stdlib.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <string.h>
-
-struct fifo {
-    /** The base fd operations */
-    struct sock_fd base_fd;
-    
-    /** The path to the fifo */
-    char *path;
-};
-
-/**
- * Get a sock_fd pointer from a sock_fifo pointer
- */
-#define FIFO_FD(sock_ptr) (&(sock_ptr)->base_fd)
-
-/**
- * Get a sock_base pointer from a sock_fifo pointer
- */
-#define FIFO_BASE(sock_ptr) SOCK_FD_BASE(FIFO_FD(sock_ptr))
-
-/**
- * Get the sock_stream.err pointer from a sock_fifo pointer
- */
-#define FIFO_ERR(sock_ptr) SOCK_ERR(FIFO_BASE(sock_ptr))
-
-
-
-/**
- * (re)open the fifo, closing it if already open, and keeping any event callbacks registered.
- */
-static err_t fifo_open (struct fifo *fifo, struct error_info *err)
-{
-    int fd;
-
-    // open(2) the path in non-blocking mode
-    // XXX: hardoded read-only
-    if ((fd = open(fifo->path, O_RDONLY | O_NONBLOCK)) < 0)
-        RETURN_SET_ERROR_ERRNO(err, ERR_OPEN);
-
-    // set the new fd
-    if ((ERROR_CODE(err) = sock_fd_set(FIFO_FD(fifo), fd)))
-        return ERROR_CODE(err);
-    
-    // ok
-    return SUCCESS;
-}
-
-/**
- * Destroy the fifo, releasing all resources
- */
-static void fifo_destroy (struct fifo *fifo)
-{
-    // close if open
-    if (FIFO_FD(fifo)->fd >= 0)
-        sock_fd_close(FIFO_FD(fifo));
-
-    // release the path
-    free(fifo->path);
-    free(fifo);
-}
-
-/**
- * sock_stream_methods::read implementation.
- *
- * Try and do a normal sock_fd_read call, but re-open with EAGAIN on EOF
- */
-err_t fifo_read (struct sock_stream *base_sock, void *buf, size_t *len, struct error_info *err)
-{
-    struct fifo *fifo = SOCK_FROM_BASE(base_sock, struct fifo);
-
-    // passthru ERR_READ_EOF unless it's READ_EOF
-    if (sock_fd_read(base_sock, buf, len, err) != ERR_READ_EOF)
-        return ERROR_CODE(err);
-    
-    // re-open it
-    // XXX: re-add events?
-    if (fifo_open(fifo, err))
-        goto error;
-
-    // ok, act as if it was EAGAIN
-    *len = 0;
-
-    return SUCCESS;
-
-error:
-    return ERROR_CODE(err);
-}
-
-/**
- * sock_stream_methods::release implementation
- */
-static void fifo_release (struct sock_stream *base_sock)
-{
-    struct fifo *fifo = SOCK_FROM_BASE(base_sock, struct fifo);
-    
-    fifo_destroy(fifo);
-}
-
-/*
- * Our sock_stream_type
- */
-static struct sock_stream_type fifo_stream_type = {
-    .methods                = {
-        .read               = &fifo_read,
-        .write              = NULL,
-        .event_init         = &sock_fd_event_init,
-        .event_enable       = &sock_fd_event_enable,
-        .release            = &fifo_release,
-    },
-};
-
-err_t fifo_open_read (struct sock_stream **stream_ptr, const char *path, struct error_info *err)
-{
-    struct fifo *fifo;
-
-    // alloc
-    if ((fifo = calloc(1, sizeof(*fifo))) == NULL)
-        return SET_ERROR(err, ERR_CALLOC);
-
-    // copy the path
-    if ((fifo->path = strdup(path)) == NULL)
-        return SET_ERROR(err, ERR_STRDUP);
-
-    // init
-    sock_stream_init(FIFO_BASE(fifo), &fifo_stream_type, NULL, NULL);
-    sock_fd_init(FIFO_FD(fifo), -1);
-
-    // open the fifo
-    if (fifo_open(fifo, err))
-        goto error;
-
-    // ok
-    *stream_ptr = FIFO_BASE(fifo);
-
-    return SUCCESS;
-
-error:
-    // cleanup
-    fifo_destroy(fifo);
-
-    return ERROR_CODE(err);
-}
--- a/src/sock_tcp.c	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/sock_tcp.c	Tue Apr 28 22:36:36 2009 +0300
@@ -143,30 +143,26 @@
  *
  * The given \a err should be NULL for successful completion, or the error for failures.
  */
-static void sock_tcp_connect_done (struct sock_tcp *sock, struct error_info *err)
+static void sock_tcp_connect_done (struct sock_tcp *sock, struct error_info *conn_err)
 {
+    error_t err;
+
     // cleanup
     sock_tcp_connect_cleanup(sock);
 
-    if (err)
+    if (conn_err)
         // passthrough errors
-        goto error;
+        JUMP_SET_ERROR_INFO(&err, conn_err);
     
-    // install the transport_invoke callback handler
-    if ((ERROR_CODE(err) = transport_fd_setup(SOCK_TCP_FD(sock), transport_fd_callback_user, NULL)))
+    // set up for default transport event-based operation
+    if ((ERROR_CODE(&err) = transport_fd_defaults(SOCK_TCP_FD(sock))))
         goto error;
 
-    // enable read unless masked out
-    if (SOCK_TCP_TRANSPORT(sock)->info.ev_mask & TRANSPORT_READ) {
-        if ((ERROR_CODE(err) = transport_fd_enable(SOCK_TCP_FD(sock), TRANSPORT_READ)))
-            goto error;
-    }
-
     // ok, no error
 
 error:                
     // pass on to transport
-    transport_connected(SOCK_TCP_TRANSPORT(sock), IS_ERROR(err) ? err : NULL, false);
+    transport_connected(SOCK_TCP_TRANSPORT(sock), IS_ERROR(&err) ? &err : NULL, false);
 }
 
 /**
--- a/src/transport.c	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/transport.c	Tue Apr 28 22:36:36 2009 +0300
@@ -12,7 +12,9 @@
 
     // store
     transport->type = type;
-    transport->info = *info;
+
+    if (info)
+        transport->info = *info;
 }
 
 void* transport_check (transport_t *transport, const struct transport_type *type)
--- a/src/transport_fd.c	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/transport_fd.c	Tue Apr 28 22:36:36 2009 +0300
@@ -280,6 +280,27 @@
     fd->cb_func = fd->cb_arg = NULL;
 }
 
+err_t transport_fd_defaults (struct transport_fd *fd)
+{
+    error_t err;
+
+    // install the transport_invoke callback handler
+    if ((ERROR_CODE(&err) = transport_fd_setup(fd, transport_fd_callback_user, NULL)))
+        goto error;
+
+    // enable read unless masked out
+    if (TRANSPORT_FD_BASE(fd)->info.ev_mask & TRANSPORT_READ) {
+        if ((ERROR_CODE(&err) = transport_fd_enable(fd, TRANSPORT_READ)))
+            goto error;
+    }
+    
+    // ok
+    return SUCCESS;
+
+error:
+    return ERROR_CODE(&err);
+}
+
 err_t transport_fd_set (struct transport_fd *fd, int _fd)
 {
     assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
--- a/src/transport_fd.h	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/transport_fd.h	Tue Apr 28 22:36:36 2009 +0300
@@ -96,7 +96,7 @@
  * Initialize the transport_fd to use the given, connected fd, or TRANSPORT_FD_INVALID if we don't yet have an fd.
  *
  * It is an error to call this if the transport_fd already has an fd set
- *
+ i*
  * @param fd the transport_fd state
  * @param ev_base the libevent base to use
  * @param _fd the OS file descriptor, or TRANSPORT_FD_INVALID
@@ -116,17 +116,17 @@
 err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg);
 
 /**
- * Enable the specified events, any of { EV_WRITE, EV_READ }.
+ * Enable the specified events, any of { TRANSPORT_READ, TRANSPORT_WRITE }.
  */
 err_t transport_fd_enable (struct transport_fd *fd, short mask);
 
 /**
- * Disable the specifid events, any of { EV_WRITE, EV_READ }.
+ * Disable the specifid events, any of { TRANSPORT_READ, TRANSPORT_WRITE }.
  */
 err_t transport_fd_disable (struct transport_fd *fd, short mask);
 
 /**
- * Set the enable/disable state of our events to the given mask of { EV_WRITE, EV_READ }.
+ * Set the enable/disable state of our events to the given mask.
  */
 err_t transport_fd_events (struct transport_fd *fd, short mask);
 
@@ -138,6 +138,12 @@
 void transport_fd_clear (struct transport_fd *fd);
 
 /**
+ * Setup and enable the default event handlers for transport operation, that is, use transport_fd_callback_user as the 
+ * callback and enable TRANSPORT_READ, unless masked out.
+ */
+err_t transport_fd_defaults (struct transport_fd *fd);
+
+/**
  * Replace the old fd with a new one, maintaining any event callbacks set with transport_fd_callback. If any events were
  * enabled before, they are not enabled anymore.
  */
--- a/src/transport_internal.h	Tue Apr 28 22:08:59 2009 +0300
+++ b/src/transport_internal.h	Tue Apr 28 22:36:36 2009 +0300
@@ -74,6 +74,9 @@
 /**
  * Bind the given transport to the given type with the given user info.
  *
+ * \a info may be given as NULL to not have any callbacks, but this will crash if any transport_* is called before
+ * transport_set_callbacks().
+ *
  * It is a bug to call this with a transport that is already bound.
  */
 void transport_init (transport_t *transport, const struct transport_type *type, const struct transport_info *info);