implement irc_queue, with some basic functionality tests
authorTero Marttila <terom@fixme.fi>
Mon, 30 Mar 2009 16:31:24 +0300
changeset 90 9d489b1039b2
parent 89 68345a9b99a3
child 91 bca23cbe1dce
implement irc_queue, with some basic functionality tests
TODO
src/CMakeLists.txt
src/irc_line.h
src/irc_queue.c
src/irc_queue.h
src/line_proto.h
src/test.c
--- a/TODO	Mon Mar 30 14:45:14 2009 +0300
+++ b/TODO	Mon Mar 30 16:31:24 2009 +0300
@@ -9,9 +9,6 @@
 irc_net:
  * reconnect, maybe cycling servers?
 
-irc_chan:
- * handle KICK
-
 config:
  * A more advanced structured value parser that can then handle all the various configuration values sanely
 
--- a/src/CMakeLists.txt	Mon Mar 30 14:45:14 2009 +0300
+++ b/src/CMakeLists.txt	Mon Mar 30 16:31:24 2009 +0300
@@ -10,7 +10,7 @@
 # define our source code modules
 set (CORE_SOURCES error.c log.c)
 set (SOCK_SOURCES sock.c sock_tcp.c sock_gnutls.c sock_test.c line_proto.c)
-set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c)
+set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c)
 
 set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES} signals.c module.c config.c)
 set (TEST_SOURCES test.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES})
--- a/src/irc_line.h	Mon Mar 30 14:45:14 2009 +0300
+++ b/src/irc_line.h	Mon Mar 30 16:31:24 2009 +0300
@@ -10,9 +10,9 @@
 #include "error.h"
 
 /**
- * The maximum length of a line, without terminating CRLF
+ * The maximum length of a line, without terminating CRLF, but including NUL
  */
-#define IRC_LINE_MAX 510
+#define IRC_LINE_MAX (510 + 1)
 
 /**
  * The maximum number of arguments for a single command
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/irc_queue.c	Mon Mar 30 16:31:24 2009 +0300
@@ -0,0 +1,228 @@
+#include "irc_queue.h"
+#include "log.h"
+
+// XXX: for ev_base
+#include "sock_internal.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+/**
+ * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
+ * time if it's fallen behind.
+ */
+static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
+{
+    time_t now = time(NULL);
+    err_t err;
+    int ret;
+    
+    // XXX: output buffering?
+    if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
+        return (err = -ret);
+    
+    // reset timer to current time if it's fallen behind
+    if (queue->timer < now)
+        queue->timer = now;
+
+    // apply penalty
+    queue->timer += IRC_QUEUE_PENALTY;
+    
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
+ * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
+ *
+ * If the queue is empty and no entry is given, does nothing.
+ */
+static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
+{
+    err_t err;
+
+    // pop the next entry?
+    if (next == NULL) {
+        // take it from the head of the list
+        next = TAILQ_FIRST(&queue->list);
+    
+        // nothing to do?
+        if (!next)
+            return SUCCESS;
+        
+        // and then remove it
+        TAILQ_REMOVE(&queue->list, next, queue_list);
+    }
+
+    // send it
+    if ((err = irc_queue_send_buf(queue, next->line_buf)))
+        goto error;
+    
+    // ok, release it
+    free(next);
+    
+    // done
+    return SUCCESS;
+
+error:
+    // hmm... re-insert and return error, we can try again later
+    TAILQ_INSERT_HEAD(&queue->list, next, queue_list);
+
+    return err;    
+}
+
+/**
+ * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
+ * given, it should be the next entry to send, and not part of the queue_list.
+ */
+static err_t irc_queue_schedule (struct irc_queue *queue)
+{
+    time_t now = time(NULL);
+    struct timeval tv;
+    
+    // calculate the timeout
+    int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
+    
+    // setup the timeout, zero if we don't actually need to wait...
+    tv.tv_sec = timeout > 0 ? timeout : 0;
+    tv.tv_usec = 0;
+
+    // setup the timer event
+    if (evtimer_add(queue->ev, &tv))
+        return ERR_EVENT_ADD;
+
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Our timer callback
+ */
+static void irc_queue_timer (int fd, short what, void *arg)
+{
+    time_t now = time(NULL);
+    struct irc_queue *queue = arg;
+    err_t err;
+
+    (void) fd;
+    (void) what;
+
+    // pump the queue until our timer is full again, or the queue is empty
+    while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
+        if ((err = irc_queue_pump(queue, NULL))) {
+            log_warn("irc_queue_pump: %s", error_name(err));
+            break;
+        }
+    }
+
+    // reschedule if needed
+    if (!TAILQ_EMPTY(&queue->list)) {
+        if ((err = irc_queue_schedule(queue))) {
+            log_err(err, "irc_queue_scheulde");
+        }
+    }
+}
+
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err)
+{
+    struct irc_queue *queue;
+
+    // alloc
+    if ((queue = calloc(1, sizeof(*queue))) == NULL)
+        return SET_ERROR(err, ERR_CALLOC);
+
+    // create the timer event
+    // XXX: using the sock module ev_base
+    if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL)
+        JUMP_SET_ERROR(err, ERR_EVENT_NEW);
+
+    // initialize
+    queue->lp = lp;
+    queue->timer = time(NULL);
+    TAILQ_INIT(&queue->list); 
+
+    // ok
+    *queue_ptr = queue;
+
+    return SUCCESS;
+
+error:
+    // cleanup
+    free(queue);
+
+    return ERROR_CODE(err);    
+}
+
+/**
+ * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
+ */
+static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
+{
+    char line_buf[IRC_LINE_MAX + 2];
+    err_t err;
+
+    // format
+    if ((err = irc_line_build(line, line_buf)))
+        return err;
+    
+    // add CRLF
+    strcat(line_buf, "\r\n");
+
+    // send
+    // XXX: handle send-buffer-full by enqueuing it after all
+    return irc_queue_send_buf(queue, line_buf);
+}
+
+/**
+ * Enqueue a irc_line onto the queue's list, and schedule the timer for execution
+ */
+static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
+{
+    struct irc_queue_entry *entry;
+    err_t err;
+
+    // alloc
+    if ((entry = calloc(1, sizeof(*entry))) == NULL)
+        return ERR_CALLOC;
+
+    // format
+    if ((err = irc_line_build(line, entry->line_buf)))
+        goto error;
+
+    // add CRLF
+    strcat(entry->line_buf, "\r\n");
+   
+    // then re-schedule the queue
+    if ((err = irc_queue_schedule(queue)))
+        goto error;
+
+    // append to end of list
+    TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
+ 
+    // ok
+    return SUCCESS;
+
+error:
+    // cleanup
+    free(entry);
+
+    return err;    
+}
+
+err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
+{
+    // current time
+    time_t now = time(NULL);
+
+    if (queue->timer < now + IRC_QUEUE_WINDOW) {
+        // timer is OK, send directly
+        return irc_queue_send_direct(queue, line);
+    
+    } else {
+        // enqueue for later transmission
+        return irc_queue_put(queue, line);
+    }
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/irc_queue.h	Mon Mar 30 16:31:24 2009 +0300
@@ -0,0 +1,84 @@
+#ifndef IRC_QUEUE_H
+#define IRC_QUEUE_H
+
+/**
+ * @file
+ *
+ * Ratelimited queue of outgoing irc_line's for use with irc_conn.
+ *
+ * This implements the basic flood control algorithm as described in RFC1459 section 8.10.
+ */
+#include "irc_line.h"
+#include "line_proto.h"
+#include "error.h"
+
+#include <sys/queue.h>
+#include <event2/event.h>
+
+/**
+ * Number of seconds of penalty applied for each message
+ */
+#define IRC_QUEUE_PENALTY 2
+
+/**
+ * Maximum allowed burst, in seconds
+ */
+#define IRC_QUEUE_WINDOW 10
+
+/**
+ * An enqueued irc_line for later delivery
+ */
+struct irc_queue_entry {
+    /** The formatted irc_line data, including terminating CRLF */
+    char line_buf[IRC_LINE_MAX + 2];
+
+    /** Our entry in the irc_queue list */
+    TAILQ_ENTRY(irc_queue_entry) queue_list;
+};
+
+/**
+ * The queue state and timing algorithm, as described in the RFC:
+ *  * A line will be sent directly if the current timer is less than IRC_QUEUE_WINDOW seconds into the future, and
+ *    IRC_QUEUE_PENALTY seconds are added on to the timer.
+ *  * Otherwise, the line is stored as a irc_queue_entry and enqueued for later transmission, once the timer value
+ *    drops below IRC_QUEUE_WINDOW seconds into the future.
+ *  * The above is repeated for each dequeued line.
+ *
+ * Additionally, if sending a line fails due to the line_proto socket buffer being full, this also handles this case
+ * by queueing the line for later sending.
+ *
+ * Note that the timing behaviour of this is rather unexact - the socket layer may introduce its own delays/jitter which
+ * we can't measure or control here, but at least we make an effort. This should work OK as long as the outgoing lines
+ * don't accumulate in the socket's write buffer too much. XXX: maybe tweak socket params to try and "disable" 
+ * buffering on our end to improve accuracy?
+ */
+struct irc_queue {
+    /** The line_proto that we can send the messages out on */
+    struct line_proto *lp;
+
+    /** Our timeout used for delaying messages */
+    struct event *ev;
+
+    /** Current "message timer" value, may be up to IRC_QUEUE_WINDOW seconds in the future */
+    time_t timer;
+
+    /** The actual queue of messages */
+    TAILQ_HEAD(irc_queue_entry_list, irc_queue_entry) list;
+};
+
+/**
+ * Create a new irc_queue for use with the given line_proto
+ */
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err);
+
+/**
+ * Process a line, either sending it directly, or enqueueing it, based on the timer state.
+ */
+err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line);
+
+/**
+ * Destroy the irc_queue, releasing all queued lines
+ */
+void irc_queue_destroy (struct irc_queue *queue);
+
+#endif /* IRC_QUEUE_H */
--- a/src/line_proto.h	Mon Mar 30 14:45:14 2009 +0300
+++ b/src/line_proto.h	Mon Mar 30 16:31:24 2009 +0300
@@ -52,7 +52,7 @@
 err_t line_proto_recv (struct line_proto *lp, char **line_ptr);
 
 /**
- * Write a single line to the sock_stream, buffering any incomplete fragment that remains unset. Returns zero if the
+ * Write a single line to the sock_stream, buffering any incomplete fragment that remains unsent. Returns zero if the
  * line was succesfully sent, >0 if it was only partially sent, or -err on errors.
  *
  * The given line should already include the terminating '\r\n' character sequence.
--- a/src/test.c	Mon Mar 30 14:45:14 2009 +0300
+++ b/src/test.c	Mon Mar 30 16:31:24 2009 +0300
@@ -3,6 +3,7 @@
  */
 #include "sock_test.h"
 #include "line_proto.h"
+#include "irc_queue.h"
 #include "irc_conn.h"
 #include "irc_net.h"
 #include "log.h"
@@ -18,6 +19,15 @@
 #define DUMP_STR_COUNT 8
 #define DUMP_STR_TAIL 10
 
+/**
+ * Global test-running state
+ */
+struct test_ctx {
+    /** The event_base that we have setup */
+    struct event_base *ev_base;
+
+} _test_ctx;
+
 char *dump_str_append (char *buf, const char *str)
 {
     while (*str)
@@ -257,6 +267,20 @@
 }
 
 /**
+ * Setup the global sock_stream state
+ */
+struct event_base* setup_sock (void)
+{
+    struct event_base *ev_base;
+    struct error_info err;
+
+    assert((ev_base = event_base_new()));
+    assert_success(sock_init(ev_base, &err));
+
+    return ev_base;
+}
+
+/**
  * Create an empty sock_test
  */
 struct sock_test* setup_sock_test (void)
@@ -416,6 +440,63 @@
     line_proto_release(lp);
 }
 
+void test_irc_queue (void)
+{
+    struct sock_test *sock = sock_test_create();
+    struct line_proto *lp;
+    struct irc_queue *queue;
+    struct irc_queue_entry *queue_entry;
+    struct error_info err;
+
+    // create the lp
+    assert_success(line_proto_create(&lp, SOCK_TEST_BASE(sock), 128, &_lp_callbacks, NULL, &err));
+
+    // create the queue
+    assert_success(irc_queue_create(&queue, lp, &err));
+
+    struct irc_line line = {
+        NULL, "TEST", { "fooX" }
+    };
+
+    // then test simple writes, we should be able to push five lines directly
+    log_info("test irc_queue_process (irc_queue_send_direct)");
+    line.args[0] = "foo0"; assert_success(irc_queue_process(queue, &line));
+    line.args[0] = "foo1"; assert_success(irc_queue_process(queue, &line));
+    line.args[0] = "foo2"; assert_success(irc_queue_process(queue, &line));
+    line.args[0] = "foo3"; assert_success(irc_queue_process(queue, &line));
+    line.args[0] = "foo4"; assert_success(irc_queue_process(queue, &line));
+
+    // they should all be output
+    assert_sock_data(sock, 
+            "TEST foo0\r\n"
+            "TEST foo1\r\n"
+            "TEST foo2\r\n"
+            "TEST foo3\r\n"
+            "TEST foo4\r\n"
+    );
+
+    // then enqueue
+    log_info("test irc_queue_process (irc_queue_put)");
+    line.args[0] = "foo5"; assert_success(irc_queue_process(queue, &line));
+
+    // ensure it was enqueued
+    assert((queue_entry = TAILQ_FIRST(&queue->list)) != NULL);
+    assert_strcmp(queue_entry->line_buf, "TEST foo5\r\n");
+
+    // ensure timer is set
+    assert(event_pending(queue->ev, EV_TIMEOUT, NULL));
+
+    // run the event loop to let the timer run
+    log_info("running the event loop once...");
+    assert(event_base_loop(_test_ctx.ev_base, EVLOOP_ONCE) == 0);
+
+    // test to check that the line was now sent
+    log_info("checking that the delayed line was sent...");
+    assert_sock_data(sock, "TEST foo5\r\n");
+    assert(TAILQ_EMPTY(&queue->list));
+    assert(!event_pending(queue->ev, EV_TIMEOUT, NULL));
+}
+
 struct test_conn_ctx {
     /** Callback flags */
     bool on_registered, on_TEST, on_error, on_quit;
@@ -994,6 +1075,7 @@
     {   "dump_str",             &test_dump_str              },
     {   "sock_test",            &test_sock_test             },
     {   "line_proto",           &test_line_proto            },
+    {   "irc_queue",            &test_irc_queue             },
     // XXX: irc_line_parse_invalid_prefix
     {   "irc_conn",             &test_irc_conn              },
     {   "irc_conn_self_nick",   &test_irc_conn_self_nick    },
@@ -1017,6 +1099,7 @@
     OPT_HELP            = 'h',
     OPT_DEBUG           = 'd',
     OPT_QUIET           = 'q',
+    OPT_LIST            = 'l',
     
     /** Options without short names */
     _OPT_EXT_BEGIN      = 0x00ff,
@@ -1029,6 +1112,7 @@
     {"help",            0,  NULL,   OPT_HELP        },
     {"debug",           0,  NULL,   OPT_DEBUG       },
     {"quiet",           0,  NULL,   OPT_QUIET       },
+    {"list",            0,  NULL,   OPT_LIST        },
     {0,                 0,  0,      0               },
 };
 
@@ -1042,6 +1126,18 @@
     printf(" --help / -h            display this message\n");
     printf(" --debug / -d           display DEBUG log messages\n");
     printf(" --quiet / -q           supress INFO log messages\n");
+    printf(" --list / -l            list all tests\n");
+}
+
+static void list_tests (struct test *tests)
+{
+    struct test *test;
+    
+    printf("Available tests:\n");
+
+    for (test = tests; test->name; test++) {
+        printf("\t%s\n", test->name);
+    }
 }
 
 int main (int argc, char **argv)
@@ -1053,7 +1149,7 @@
     const char *filter = NULL;
 
     // parse options
-    while ((opt = getopt_long(argc, argv, "hdq", options, &option_index)) != -1) {
+    while ((opt = getopt_long(argc, argv, "hdql", options, &option_index)) != -1) {
         switch (opt) {
             case OPT_HELP:
                 usage(argv[0]);
@@ -1066,7 +1162,11 @@
             case OPT_QUIET:
                 set_log_level(LOG_WARN);
                 break;
-            
+           
+            case OPT_LIST:
+                list_tests(_tests);
+                exit(EXIT_SUCCESS);
+
             case '?':
                 usage(argv[0]);
                 exit(EXIT_FAILURE);
@@ -1084,6 +1184,9 @@
         }
     }
 
+    // setup the sockets stuff
+    _test_ctx.ev_base = setup_sock();
+
     // run tests
     for (test = _tests; test->name; test++) {
         if (filter && strcmp(test->name, filter))