--- a/TODO Mon Mar 30 14:45:14 2009 +0300
+++ b/TODO Mon Mar 30 16:31:24 2009 +0300
@@ -9,9 +9,6 @@
irc_net:
* reconnect, maybe cycling servers?
-irc_chan:
- * handle KICK
-
config:
* A more advanced structured value parser that can then handle all the various configuration values sanely
--- a/src/CMakeLists.txt Mon Mar 30 14:45:14 2009 +0300
+++ b/src/CMakeLists.txt Mon Mar 30 16:31:24 2009 +0300
@@ -10,7 +10,7 @@
# define our source code modules
set (CORE_SOURCES error.c log.c)
set (SOCK_SOURCES sock.c sock_tcp.c sock_gnutls.c sock_test.c line_proto.c)
-set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c)
+set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c)
set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES} signals.c module.c config.c)
set (TEST_SOURCES test.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES})
--- a/src/irc_line.h Mon Mar 30 14:45:14 2009 +0300
+++ b/src/irc_line.h Mon Mar 30 16:31:24 2009 +0300
@@ -10,9 +10,9 @@
#include "error.h"
/**
- * The maximum length of a line, without terminating CRLF
+ * The maximum length of a line, without terminating CRLF, but including NUL
*/
-#define IRC_LINE_MAX 510
+#define IRC_LINE_MAX (510 + 1)
/**
* The maximum number of arguments for a single command
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/irc_queue.c Mon Mar 30 16:31:24 2009 +0300
@@ -0,0 +1,228 @@
+#include "irc_queue.h"
+#include "log.h"
+
+// XXX: for ev_base
+#include "sock_internal.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+/**
+ * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
+ * time if it's fallen behind.
+ */
+static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
+{
+ time_t now = time(NULL);
+ err_t err;
+ int ret;
+
+ // XXX: output buffering?
+ if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
+ return (err = -ret);
+
+ // reset timer to current time if it's fallen behind
+ if (queue->timer < now)
+ queue->timer = now;
+
+ // apply penalty
+ queue->timer += IRC_QUEUE_PENALTY;
+
+ // ok
+ return SUCCESS;
+}
+
+/**
+ * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
+ * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
+ *
+ * If the queue is empty and no entry is given, does nothing.
+ */
+static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
+{
+ err_t err;
+
+ // pop the next entry?
+ if (next == NULL) {
+ // take it from the head of the list
+ next = TAILQ_FIRST(&queue->list);
+
+ // nothing to do?
+ if (!next)
+ return SUCCESS;
+
+ // and then remove it
+ TAILQ_REMOVE(&queue->list, next, queue_list);
+ }
+
+ // send it
+ if ((err = irc_queue_send_buf(queue, next->line_buf)))
+ goto error;
+
+ // ok, release it
+ free(next);
+
+ // done
+ return SUCCESS;
+
+error:
+ // hmm... re-insert and return error, we can try again later
+ TAILQ_INSERT_HEAD(&queue->list, next, queue_list);
+
+ return err;
+}
+
+/**
+ * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
+ * given, it should be the next entry to send, and not part of the queue_list.
+ */
+static err_t irc_queue_schedule (struct irc_queue *queue)
+{
+ time_t now = time(NULL);
+ struct timeval tv;
+
+ // calculate the timeout
+ int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
+
+ // setup the timeout, zero if we don't actually need to wait...
+ tv.tv_sec = timeout > 0 ? timeout : 0;
+ tv.tv_usec = 0;
+
+ // setup the timer event
+ if (evtimer_add(queue->ev, &tv))
+ return ERR_EVENT_ADD;
+
+ // ok
+ return SUCCESS;
+}
+
+/**
+ * Our timer callback
+ */
+static void irc_queue_timer (int fd, short what, void *arg)
+{
+ time_t now = time(NULL);
+ struct irc_queue *queue = arg;
+ err_t err;
+
+ (void) fd;
+ (void) what;
+
+ // pump the queue until our timer is full again, or the queue is empty
+ while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
+ if ((err = irc_queue_pump(queue, NULL))) {
+ log_warn("irc_queue_pump: %s", error_name(err));
+ break;
+ }
+ }
+
+ // reschedule if needed
+ if (!TAILQ_EMPTY(&queue->list)) {
+ if ((err = irc_queue_schedule(queue))) {
+ log_err(err, "irc_queue_scheulde");
+ }
+ }
+}
+
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err)
+{
+ struct irc_queue *queue;
+
+ // alloc
+ if ((queue = calloc(1, sizeof(*queue))) == NULL)
+ return SET_ERROR(err, ERR_CALLOC);
+
+ // create the timer event
+ // XXX: using the sock module ev_base
+ if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL)
+ JUMP_SET_ERROR(err, ERR_EVENT_NEW);
+
+ // initialize
+ queue->lp = lp;
+ queue->timer = time(NULL);
+ TAILQ_INIT(&queue->list);
+
+ // ok
+ *queue_ptr = queue;
+
+ return SUCCESS;
+
+error:
+ // cleanup
+ free(queue);
+
+ return ERROR_CODE(err);
+}
+
+/**
+ * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
+ */
+static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
+{
+ char line_buf[IRC_LINE_MAX + 2];
+ err_t err;
+
+ // format
+ if ((err = irc_line_build(line, line_buf)))
+ return err;
+
+ // add CRLF
+ strcat(line_buf, "\r\n");
+
+ // send
+ // XXX: handle send-buffer-full by enqueuing it after all
+ return irc_queue_send_buf(queue, line_buf);
+}
+
+/**
+ * Enqueue a irc_line onto the queue's list, and schedule the timer for execution
+ */
+static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
+{
+ struct irc_queue_entry *entry;
+ err_t err;
+
+ // alloc
+ if ((entry = calloc(1, sizeof(*entry))) == NULL)
+ return ERR_CALLOC;
+
+ // format
+ if ((err = irc_line_build(line, entry->line_buf)))
+ goto error;
+
+ // add CRLF
+ strcat(entry->line_buf, "\r\n");
+
+ // then re-schedule the queue
+ if ((err = irc_queue_schedule(queue)))
+ goto error;
+
+ // append to end of list
+ TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
+
+ // ok
+ return SUCCESS;
+
+error:
+ // cleanup
+ free(entry);
+
+ return err;
+}
+
+err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
+{
+ // current time
+ time_t now = time(NULL);
+
+ if (queue->timer < now + IRC_QUEUE_WINDOW) {
+ // timer is OK, send directly
+ return irc_queue_send_direct(queue, line);
+
+ } else {
+ // enqueue for later transmission
+ return irc_queue_put(queue, line);
+ }
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/irc_queue.h Mon Mar 30 16:31:24 2009 +0300
@@ -0,0 +1,84 @@
+#ifndef IRC_QUEUE_H
+#define IRC_QUEUE_H
+
+/**
+ * @file
+ *
+ * Ratelimited queue of outgoing irc_line's for use with irc_conn.
+ *
+ * This implements the basic flood control algorithm as described in RFC1459 section 8.10.
+ */
+#include "irc_line.h"
+#include "line_proto.h"
+#include "error.h"
+
+#include <sys/queue.h>
+#include <event2/event.h>
+
+/**
+ * Number of seconds of penalty applied for each message
+ */
+#define IRC_QUEUE_PENALTY 2
+
+/**
+ * Maximum allowed burst, in seconds
+ */
+#define IRC_QUEUE_WINDOW 10
+
+/**
+ * An enqueued irc_line for later delivery
+ */
+struct irc_queue_entry {
+ /** The formatted irc_line data, including terminating CRLF */
+ char line_buf[IRC_LINE_MAX + 2];
+
+ /** Our entry in the irc_queue list */
+ TAILQ_ENTRY(irc_queue_entry) queue_list;
+};
+
+/**
+ * The queue state and timing algorithm, as described in the RFC:
+ * * A line will be sent directly if the current timer is less than IRC_QUEUE_WINDOW seconds into the future, and
+ * IRC_QUEUE_PENALTY seconds are added on to the timer.
+ * * Otherwise, the line is stored as a irc_queue_entry and enqueued for later transmission, once the timer value
+ * drops below IRC_QUEUE_WINDOW seconds into the future.
+ * * The above is repeated for each dequeued line.
+ *
+ * Additionally, if sending a line fails due to the line_proto socket buffer being full, this also handles this case
+ * by queueing the line for later sending.
+ *
+ * Note that the timing behaviour of this is rather unexact - the socket layer may introduce its own delays/jitter which
+ * we can't measure or control here, but at least we make an effort. This should work OK as long as the outgoing lines
+ * don't accumulate in the socket's write buffer too much. XXX: maybe tweak socket params to try and "disable"
+ * buffering on our end to improve accuracy?
+ */
+struct irc_queue {
+ /** The line_proto that we can send the messages out on */
+ struct line_proto *lp;
+
+ /** Our timeout used for delaying messages */
+ struct event *ev;
+
+ /** Current "message timer" value, may be up to IRC_QUEUE_WINDOW seconds in the future */
+ time_t timer;
+
+ /** The actual queue of messages */
+ TAILQ_HEAD(irc_queue_entry_list, irc_queue_entry) list;
+};
+
+/**
+ * Create a new irc_queue for use with the given line_proto
+ */
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err);
+
+/**
+ * Process a line, either sending it directly, or enqueueing it, based on the timer state.
+ */
+err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line);
+
+/**
+ * Destroy the irc_queue, releasing all queued lines
+ */
+void irc_queue_destroy (struct irc_queue *queue);
+
+#endif /* IRC_QUEUE_H */
--- a/src/line_proto.h Mon Mar 30 14:45:14 2009 +0300
+++ b/src/line_proto.h Mon Mar 30 16:31:24 2009 +0300
@@ -52,7 +52,7 @@
err_t line_proto_recv (struct line_proto *lp, char **line_ptr);
/**
- * Write a single line to the sock_stream, buffering any incomplete fragment that remains unset. Returns zero if the
+ * Write a single line to the sock_stream, buffering any incomplete fragment that remains unsent. Returns zero if the
* line was succesfully sent, >0 if it was only partially sent, or -err on errors.
*
* The given line should already include the terminating '\r\n' character sequence.
--- a/src/test.c Mon Mar 30 14:45:14 2009 +0300
+++ b/src/test.c Mon Mar 30 16:31:24 2009 +0300
@@ -3,6 +3,7 @@
*/
#include "sock_test.h"
#include "line_proto.h"
+#include "irc_queue.h"
#include "irc_conn.h"
#include "irc_net.h"
#include "log.h"
@@ -18,6 +19,15 @@
#define DUMP_STR_COUNT 8
#define DUMP_STR_TAIL 10
+/**
+ * Global test-running state
+ */
+struct test_ctx {
+ /** The event_base that we have setup */
+ struct event_base *ev_base;
+
+} _test_ctx;
+
char *dump_str_append (char *buf, const char *str)
{
while (*str)
@@ -257,6 +267,20 @@
}
/**
+ * Setup the global sock_stream state
+ */
+struct event_base* setup_sock (void)
+{
+ struct event_base *ev_base;
+ struct error_info err;
+
+ assert((ev_base = event_base_new()));
+ assert_success(sock_init(ev_base, &err));
+
+ return ev_base;
+}
+
+/**
* Create an empty sock_test
*/
struct sock_test* setup_sock_test (void)
@@ -416,6 +440,63 @@
line_proto_release(lp);
}
+void test_irc_queue (void)
+{
+ struct sock_test *sock = sock_test_create();
+ struct line_proto *lp;
+ struct irc_queue *queue;
+ struct irc_queue_entry *queue_entry;
+ struct error_info err;
+
+ // create the lp
+ assert_success(line_proto_create(&lp, SOCK_TEST_BASE(sock), 128, &_lp_callbacks, NULL, &err));
+
+ // create the queue
+ assert_success(irc_queue_create(&queue, lp, &err));
+
+ struct irc_line line = {
+ NULL, "TEST", { "fooX" }
+ };
+
+ // then test simple writes, we should be able to push five lines directly
+ log_info("test irc_queue_process (irc_queue_send_direct)");
+ line.args[0] = "foo0"; assert_success(irc_queue_process(queue, &line));
+ line.args[0] = "foo1"; assert_success(irc_queue_process(queue, &line));
+ line.args[0] = "foo2"; assert_success(irc_queue_process(queue, &line));
+ line.args[0] = "foo3"; assert_success(irc_queue_process(queue, &line));
+ line.args[0] = "foo4"; assert_success(irc_queue_process(queue, &line));
+
+ // they should all be output
+ assert_sock_data(sock,
+ "TEST foo0\r\n"
+ "TEST foo1\r\n"
+ "TEST foo2\r\n"
+ "TEST foo3\r\n"
+ "TEST foo4\r\n"
+ );
+
+ // then enqueue
+ log_info("test irc_queue_process (irc_queue_put)");
+ line.args[0] = "foo5"; assert_success(irc_queue_process(queue, &line));
+
+ // ensure it was enqueued
+ assert((queue_entry = TAILQ_FIRST(&queue->list)) != NULL);
+ assert_strcmp(queue_entry->line_buf, "TEST foo5\r\n");
+
+ // ensure timer is set
+ assert(event_pending(queue->ev, EV_TIMEOUT, NULL));
+
+ // run the event loop to let the timer run
+ log_info("running the event loop once...");
+ assert(event_base_loop(_test_ctx.ev_base, EVLOOP_ONCE) == 0);
+
+ // test to check that the line was now sent
+ log_info("checking that the delayed line was sent...");
+ assert_sock_data(sock, "TEST foo5\r\n");
+ assert(TAILQ_EMPTY(&queue->list));
+ assert(!event_pending(queue->ev, EV_TIMEOUT, NULL));
+}
+
struct test_conn_ctx {
/** Callback flags */
bool on_registered, on_TEST, on_error, on_quit;
@@ -994,6 +1075,7 @@
{ "dump_str", &test_dump_str },
{ "sock_test", &test_sock_test },
{ "line_proto", &test_line_proto },
+ { "irc_queue", &test_irc_queue },
// XXX: irc_line_parse_invalid_prefix
{ "irc_conn", &test_irc_conn },
{ "irc_conn_self_nick", &test_irc_conn_self_nick },
@@ -1017,6 +1099,7 @@
OPT_HELP = 'h',
OPT_DEBUG = 'd',
OPT_QUIET = 'q',
+ OPT_LIST = 'l',
/** Options without short names */
_OPT_EXT_BEGIN = 0x00ff,
@@ -1029,6 +1112,7 @@
{"help", 0, NULL, OPT_HELP },
{"debug", 0, NULL, OPT_DEBUG },
{"quiet", 0, NULL, OPT_QUIET },
+ {"list", 0, NULL, OPT_LIST },
{0, 0, 0, 0 },
};
@@ -1042,6 +1126,18 @@
printf(" --help / -h display this message\n");
printf(" --debug / -d display DEBUG log messages\n");
printf(" --quiet / -q supress INFO log messages\n");
+ printf(" --list / -l list all tests\n");
+}
+
+static void list_tests (struct test *tests)
+{
+ struct test *test;
+
+ printf("Available tests:\n");
+
+ for (test = tests; test->name; test++) {
+ printf("\t%s\n", test->name);
+ }
}
int main (int argc, char **argv)
@@ -1053,7 +1149,7 @@
const char *filter = NULL;
// parse options
- while ((opt = getopt_long(argc, argv, "hdq", options, &option_index)) != -1) {
+ while ((opt = getopt_long(argc, argv, "hdql", options, &option_index)) != -1) {
switch (opt) {
case OPT_HELP:
usage(argv[0]);
@@ -1066,7 +1162,11 @@
case OPT_QUIET:
set_log_level(LOG_WARN);
break;
-
+
+ case OPT_LIST:
+ list_tests(_tests);
+ exit(EXIT_SUCCESS);
+
case '?':
usage(argv[0]);
exit(EXIT_FAILURE);
@@ -1084,6 +1184,9 @@
}
}
+ // setup the sockets stuff
+ _test_ctx.ev_base = setup_sock();
+
// run tests
for (test = _tests; test->name; test++) {
if (filter && strcmp(test->name, filter))