# HG changeset patch # User Tero Marttila # Date 1238419884 -10800 # Node ID 9d489b1039b24f5e12cdbf09668fd2a39c14c94b # Parent 68345a9b99a306cf0f0d27dc5678074beb524f9d implement irc_queue, with some basic functionality tests diff -r 68345a9b99a3 -r 9d489b1039b2 TODO --- a/TODO Mon Mar 30 14:45:14 2009 +0300 +++ b/TODO Mon Mar 30 16:31:24 2009 +0300 @@ -9,9 +9,6 @@ irc_net: * reconnect, maybe cycling servers? -irc_chan: - * handle KICK - config: * A more advanced structured value parser that can then handle all the various configuration values sanely diff -r 68345a9b99a3 -r 9d489b1039b2 src/CMakeLists.txt --- a/src/CMakeLists.txt Mon Mar 30 14:45:14 2009 +0300 +++ b/src/CMakeLists.txt Mon Mar 30 16:31:24 2009 +0300 @@ -10,7 +10,7 @@ # define our source code modules set (CORE_SOURCES error.c log.c) set (SOCK_SOURCES sock.c sock_tcp.c sock_gnutls.c sock_test.c line_proto.c) -set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c) +set (IRC_SOURCES irc_line.c irc_conn.c irc_net.c irc_chan.c chain.c irc_cmd.c irc_proto.c irc_client.c irc_user.c irc_queue.c) set (NEXUS_SOURCES nexus.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES} signals.c module.c config.c) set (TEST_SOURCES test.c ${CORE_SOURCES} ${SOCK_SOURCES} ${IRC_SOURCES}) diff -r 68345a9b99a3 -r 9d489b1039b2 src/irc_line.h --- a/src/irc_line.h Mon Mar 30 14:45:14 2009 +0300 +++ b/src/irc_line.h Mon Mar 30 16:31:24 2009 +0300 @@ -10,9 +10,9 @@ #include "error.h" /** - * The maximum length of a line, without terminating CRLF + * The maximum length of a line, without terminating CRLF, but including NUL */ -#define IRC_LINE_MAX 510 +#define IRC_LINE_MAX (510 + 1) /** * The maximum number of arguments for a single command diff -r 68345a9b99a3 -r 9d489b1039b2 src/irc_queue.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/irc_queue.c Mon Mar 30 16:31:24 2009 +0300 @@ -0,0 +1,228 @@ +#include "irc_queue.h" +#include "log.h" + +// XXX: for ev_base +#include "sock_internal.h" + +#include +#include +#include + +/** + * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current + * time if it's fallen behind. + */ +static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf) +{ + time_t now = time(NULL); + err_t err; + int ret; + + // XXX: output buffering? + if ((ret = line_proto_send(queue->lp, line_buf)) < 0) + return (err = -ret); + + // reset timer to current time if it's fallen behind + if (queue->timer < now) + queue->timer = now; + + // apply penalty + queue->timer += IRC_QUEUE_PENALTY; + + // ok + return SUCCESS; +} + +/** + * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is + * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue. + * + * If the queue is empty and no entry is given, does nothing. + */ +static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next) +{ + err_t err; + + // pop the next entry? + if (next == NULL) { + // take it from the head of the list + next = TAILQ_FIRST(&queue->list); + + // nothing to do? + if (!next) + return SUCCESS; + + // and then remove it + TAILQ_REMOVE(&queue->list, next, queue_list); + } + + // send it + if ((err = irc_queue_send_buf(queue, next->line_buf))) + goto error; + + // ok, release it + free(next); + + // done + return SUCCESS; + +error: + // hmm... re-insert and return error, we can try again later + TAILQ_INSERT_HEAD(&queue->list, next, queue_list); + + return err; +} + +/** + * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is + * given, it should be the next entry to send, and not part of the queue_list. + */ +static err_t irc_queue_schedule (struct irc_queue *queue) +{ + time_t now = time(NULL); + struct timeval tv; + + // calculate the timeout + int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW)); + + // setup the timeout, zero if we don't actually need to wait... + tv.tv_sec = timeout > 0 ? timeout : 0; + tv.tv_usec = 0; + + // setup the timer event + if (evtimer_add(queue->ev, &tv)) + return ERR_EVENT_ADD; + + // ok + return SUCCESS; +} + +/** + * Our timer callback + */ +static void irc_queue_timer (int fd, short what, void *arg) +{ + time_t now = time(NULL); + struct irc_queue *queue = arg; + err_t err; + + (void) fd; + (void) what; + + // pump the queue until our timer is full again, or the queue is empty + while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) { + if ((err = irc_queue_pump(queue, NULL))) { + log_warn("irc_queue_pump: %s", error_name(err)); + break; + } + } + + // reschedule if needed + if (!TAILQ_EMPTY(&queue->list)) { + if ((err = irc_queue_schedule(queue))) { + log_err(err, "irc_queue_scheulde"); + } + } +} + +err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err) +{ + struct irc_queue *queue; + + // alloc + if ((queue = calloc(1, sizeof(*queue))) == NULL) + return SET_ERROR(err, ERR_CALLOC); + + // create the timer event + // XXX: using the sock module ev_base + if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL) + JUMP_SET_ERROR(err, ERR_EVENT_NEW); + + // initialize + queue->lp = lp; + queue->timer = time(NULL); + TAILQ_INIT(&queue->list); + + // ok + *queue_ptr = queue; + + return SUCCESS; + +error: + // cleanup + free(queue); + + return ERROR_CODE(err); +} + +/** + * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission. + */ +static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line) +{ + char line_buf[IRC_LINE_MAX + 2]; + err_t err; + + // format + if ((err = irc_line_build(line, line_buf))) + return err; + + // add CRLF + strcat(line_buf, "\r\n"); + + // send + // XXX: handle send-buffer-full by enqueuing it after all + return irc_queue_send_buf(queue, line_buf); +} + +/** + * Enqueue a irc_line onto the queue's list, and schedule the timer for execution + */ +static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line) +{ + struct irc_queue_entry *entry; + err_t err; + + // alloc + if ((entry = calloc(1, sizeof(*entry))) == NULL) + return ERR_CALLOC; + + // format + if ((err = irc_line_build(line, entry->line_buf))) + goto error; + + // add CRLF + strcat(entry->line_buf, "\r\n"); + + // then re-schedule the queue + if ((err = irc_queue_schedule(queue))) + goto error; + + // append to end of list + TAILQ_INSERT_TAIL(&queue->list, entry, queue_list); + + // ok + return SUCCESS; + +error: + // cleanup + free(entry); + + return err; +} + +err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line) +{ + // current time + time_t now = time(NULL); + + if (queue->timer < now + IRC_QUEUE_WINDOW) { + // timer is OK, send directly + return irc_queue_send_direct(queue, line); + + } else { + // enqueue for later transmission + return irc_queue_put(queue, line); + } +} + diff -r 68345a9b99a3 -r 9d489b1039b2 src/irc_queue.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/irc_queue.h Mon Mar 30 16:31:24 2009 +0300 @@ -0,0 +1,84 @@ +#ifndef IRC_QUEUE_H +#define IRC_QUEUE_H + +/** + * @file + * + * Ratelimited queue of outgoing irc_line's for use with irc_conn. + * + * This implements the basic flood control algorithm as described in RFC1459 section 8.10. + */ +#include "irc_line.h" +#include "line_proto.h" +#include "error.h" + +#include +#include + +/** + * Number of seconds of penalty applied for each message + */ +#define IRC_QUEUE_PENALTY 2 + +/** + * Maximum allowed burst, in seconds + */ +#define IRC_QUEUE_WINDOW 10 + +/** + * An enqueued irc_line for later delivery + */ +struct irc_queue_entry { + /** The formatted irc_line data, including terminating CRLF */ + char line_buf[IRC_LINE_MAX + 2]; + + /** Our entry in the irc_queue list */ + TAILQ_ENTRY(irc_queue_entry) queue_list; +}; + +/** + * The queue state and timing algorithm, as described in the RFC: + * * A line will be sent directly if the current timer is less than IRC_QUEUE_WINDOW seconds into the future, and + * IRC_QUEUE_PENALTY seconds are added on to the timer. + * * Otherwise, the line is stored as a irc_queue_entry and enqueued for later transmission, once the timer value + * drops below IRC_QUEUE_WINDOW seconds into the future. + * * The above is repeated for each dequeued line. + * + * Additionally, if sending a line fails due to the line_proto socket buffer being full, this also handles this case + * by queueing the line for later sending. + * + * Note that the timing behaviour of this is rather unexact - the socket layer may introduce its own delays/jitter which + * we can't measure or control here, but at least we make an effort. This should work OK as long as the outgoing lines + * don't accumulate in the socket's write buffer too much. XXX: maybe tweak socket params to try and "disable" + * buffering on our end to improve accuracy? + */ +struct irc_queue { + /** The line_proto that we can send the messages out on */ + struct line_proto *lp; + + /** Our timeout used for delaying messages */ + struct event *ev; + + /** Current "message timer" value, may be up to IRC_QUEUE_WINDOW seconds in the future */ + time_t timer; + + /** The actual queue of messages */ + TAILQ_HEAD(irc_queue_entry_list, irc_queue_entry) list; +}; + +/** + * Create a new irc_queue for use with the given line_proto + */ +err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err); + +/** + * Process a line, either sending it directly, or enqueueing it, based on the timer state. + */ +err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line); + +/** + * Destroy the irc_queue, releasing all queued lines + */ +void irc_queue_destroy (struct irc_queue *queue); + +#endif /* IRC_QUEUE_H */ diff -r 68345a9b99a3 -r 9d489b1039b2 src/line_proto.h --- a/src/line_proto.h Mon Mar 30 14:45:14 2009 +0300 +++ b/src/line_proto.h Mon Mar 30 16:31:24 2009 +0300 @@ -52,7 +52,7 @@ err_t line_proto_recv (struct line_proto *lp, char **line_ptr); /** - * Write a single line to the sock_stream, buffering any incomplete fragment that remains unset. Returns zero if the + * Write a single line to the sock_stream, buffering any incomplete fragment that remains unsent. Returns zero if the * line was succesfully sent, >0 if it was only partially sent, or -err on errors. * * The given line should already include the terminating '\r\n' character sequence. diff -r 68345a9b99a3 -r 9d489b1039b2 src/test.c --- a/src/test.c Mon Mar 30 14:45:14 2009 +0300 +++ b/src/test.c Mon Mar 30 16:31:24 2009 +0300 @@ -3,6 +3,7 @@ */ #include "sock_test.h" #include "line_proto.h" +#include "irc_queue.h" #include "irc_conn.h" #include "irc_net.h" #include "log.h" @@ -18,6 +19,15 @@ #define DUMP_STR_COUNT 8 #define DUMP_STR_TAIL 10 +/** + * Global test-running state + */ +struct test_ctx { + /** The event_base that we have setup */ + struct event_base *ev_base; + +} _test_ctx; + char *dump_str_append (char *buf, const char *str) { while (*str) @@ -257,6 +267,20 @@ } /** + * Setup the global sock_stream state + */ +struct event_base* setup_sock (void) +{ + struct event_base *ev_base; + struct error_info err; + + assert((ev_base = event_base_new())); + assert_success(sock_init(ev_base, &err)); + + return ev_base; +} + +/** * Create an empty sock_test */ struct sock_test* setup_sock_test (void) @@ -416,6 +440,63 @@ line_proto_release(lp); } +void test_irc_queue (void) +{ + struct sock_test *sock = sock_test_create(); + struct line_proto *lp; + struct irc_queue *queue; + struct irc_queue_entry *queue_entry; + struct error_info err; + + // create the lp + assert_success(line_proto_create(&lp, SOCK_TEST_BASE(sock), 128, &_lp_callbacks, NULL, &err)); + + // create the queue + assert_success(irc_queue_create(&queue, lp, &err)); + + struct irc_line line = { + NULL, "TEST", { "fooX" } + }; + + // then test simple writes, we should be able to push five lines directly + log_info("test irc_queue_process (irc_queue_send_direct)"); + line.args[0] = "foo0"; assert_success(irc_queue_process(queue, &line)); + line.args[0] = "foo1"; assert_success(irc_queue_process(queue, &line)); + line.args[0] = "foo2"; assert_success(irc_queue_process(queue, &line)); + line.args[0] = "foo3"; assert_success(irc_queue_process(queue, &line)); + line.args[0] = "foo4"; assert_success(irc_queue_process(queue, &line)); + + // they should all be output + assert_sock_data(sock, + "TEST foo0\r\n" + "TEST foo1\r\n" + "TEST foo2\r\n" + "TEST foo3\r\n" + "TEST foo4\r\n" + ); + + // then enqueue + log_info("test irc_queue_process (irc_queue_put)"); + line.args[0] = "foo5"; assert_success(irc_queue_process(queue, &line)); + + // ensure it was enqueued + assert((queue_entry = TAILQ_FIRST(&queue->list)) != NULL); + assert_strcmp(queue_entry->line_buf, "TEST foo5\r\n"); + + // ensure timer is set + assert(event_pending(queue->ev, EV_TIMEOUT, NULL)); + + // run the event loop to let the timer run + log_info("running the event loop once..."); + assert(event_base_loop(_test_ctx.ev_base, EVLOOP_ONCE) == 0); + + // test to check that the line was now sent + log_info("checking that the delayed line was sent..."); + assert_sock_data(sock, "TEST foo5\r\n"); + assert(TAILQ_EMPTY(&queue->list)); + assert(!event_pending(queue->ev, EV_TIMEOUT, NULL)); +} + struct test_conn_ctx { /** Callback flags */ bool on_registered, on_TEST, on_error, on_quit; @@ -994,6 +1075,7 @@ { "dump_str", &test_dump_str }, { "sock_test", &test_sock_test }, { "line_proto", &test_line_proto }, + { "irc_queue", &test_irc_queue }, // XXX: irc_line_parse_invalid_prefix { "irc_conn", &test_irc_conn }, { "irc_conn_self_nick", &test_irc_conn_self_nick }, @@ -1017,6 +1099,7 @@ OPT_HELP = 'h', OPT_DEBUG = 'd', OPT_QUIET = 'q', + OPT_LIST = 'l', /** Options without short names */ _OPT_EXT_BEGIN = 0x00ff, @@ -1029,6 +1112,7 @@ {"help", 0, NULL, OPT_HELP }, {"debug", 0, NULL, OPT_DEBUG }, {"quiet", 0, NULL, OPT_QUIET }, + {"list", 0, NULL, OPT_LIST }, {0, 0, 0, 0 }, }; @@ -1042,6 +1126,18 @@ printf(" --help / -h display this message\n"); printf(" --debug / -d display DEBUG log messages\n"); printf(" --quiet / -q supress INFO log messages\n"); + printf(" --list / -l list all tests\n"); +} + +static void list_tests (struct test *tests) +{ + struct test *test; + + printf("Available tests:\n"); + + for (test = tests; test->name; test++) { + printf("\t%s\n", test->name); + } } int main (int argc, char **argv) @@ -1053,7 +1149,7 @@ const char *filter = NULL; // parse options - while ((opt = getopt_long(argc, argv, "hdq", options, &option_index)) != -1) { + while ((opt = getopt_long(argc, argv, "hdql", options, &option_index)) != -1) { switch (opt) { case OPT_HELP: usage(argv[0]); @@ -1066,7 +1162,11 @@ case OPT_QUIET: set_log_level(LOG_WARN); break; - + + case OPT_LIST: + list_tests(_tests); + exit(EXIT_SUCCESS); + case '?': usage(argv[0]); exit(EXIT_FAILURE); @@ -1084,6 +1184,9 @@ } } + // setup the sockets stuff + _test_ctx.ev_base = setup_sock(); + // run tests for (test = _tests; test->name; test++) { if (filter && strcmp(test->name, filter))