--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/irc_queue.c Mon Mar 30 16:31:24 2009 +0300
@@ -0,0 +1,228 @@
+#include "irc_queue.h"
+#include "log.h"
+
+// XXX: for ev_base
+#include "sock_internal.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+/**
+ * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
+ * time if it's fallen behind.
+ */
+static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
+{
+ time_t now = time(NULL);
+ err_t err;
+ int ret;
+
+ // XXX: output buffering?
+ if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
+ return (err = -ret);
+
+ // reset timer to current time if it's fallen behind
+ if (queue->timer < now)
+ queue->timer = now;
+
+ // apply penalty
+ queue->timer += IRC_QUEUE_PENALTY;
+
+ // ok
+ return SUCCESS;
+}
+
+/**
+ * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
+ * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
+ *
+ * If the queue is empty and no entry is given, does nothing.
+ */
+static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
+{
+ err_t err;
+
+ // pop the next entry?
+ if (next == NULL) {
+ // take it from the head of the list
+ next = TAILQ_FIRST(&queue->list);
+
+ // nothing to do?
+ if (!next)
+ return SUCCESS;
+
+ // and then remove it
+ TAILQ_REMOVE(&queue->list, next, queue_list);
+ }
+
+ // send it
+ if ((err = irc_queue_send_buf(queue, next->line_buf)))
+ goto error;
+
+ // ok, release it
+ free(next);
+
+ // done
+ return SUCCESS;
+
+error:
+ // hmm... re-insert and return error, we can try again later
+ TAILQ_INSERT_HEAD(&queue->list, next, queue_list);
+
+ return err;
+}
+
+/**
+ * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
+ * given, it should be the next entry to send, and not part of the queue_list.
+ */
+static err_t irc_queue_schedule (struct irc_queue *queue)
+{
+ time_t now = time(NULL);
+ struct timeval tv;
+
+ // calculate the timeout
+ int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
+
+ // setup the timeout, zero if we don't actually need to wait...
+ tv.tv_sec = timeout > 0 ? timeout : 0;
+ tv.tv_usec = 0;
+
+ // setup the timer event
+ if (evtimer_add(queue->ev, &tv))
+ return ERR_EVENT_ADD;
+
+ // ok
+ return SUCCESS;
+}
+
+/**
+ * Our timer callback
+ */
+static void irc_queue_timer (int fd, short what, void *arg)
+{
+ time_t now = time(NULL);
+ struct irc_queue *queue = arg;
+ err_t err;
+
+ (void) fd;
+ (void) what;
+
+ // pump the queue until our timer is full again, or the queue is empty
+ while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
+ if ((err = irc_queue_pump(queue, NULL))) {
+ log_warn("irc_queue_pump: %s", error_name(err));
+ break;
+ }
+ }
+
+ // reschedule if needed
+ if (!TAILQ_EMPTY(&queue->list)) {
+ if ((err = irc_queue_schedule(queue))) {
+ log_err(err, "irc_queue_scheulde");
+ }
+ }
+}
+
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err)
+{
+ struct irc_queue *queue;
+
+ // alloc
+ if ((queue = calloc(1, sizeof(*queue))) == NULL)
+ return SET_ERROR(err, ERR_CALLOC);
+
+ // create the timer event
+ // XXX: using the sock module ev_base
+ if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL)
+ JUMP_SET_ERROR(err, ERR_EVENT_NEW);
+
+ // initialize
+ queue->lp = lp;
+ queue->timer = time(NULL);
+ TAILQ_INIT(&queue->list);
+
+ // ok
+ *queue_ptr = queue;
+
+ return SUCCESS;
+
+error:
+ // cleanup
+ free(queue);
+
+ return ERROR_CODE(err);
+}
+
+/**
+ * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
+ */
+static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
+{
+ char line_buf[IRC_LINE_MAX + 2];
+ err_t err;
+
+ // format
+ if ((err = irc_line_build(line, line_buf)))
+ return err;
+
+ // add CRLF
+ strcat(line_buf, "\r\n");
+
+ // send
+ // XXX: handle send-buffer-full by enqueuing it after all
+ return irc_queue_send_buf(queue, line_buf);
+}
+
+/**
+ * Enqueue a irc_line onto the queue's list, and schedule the timer for execution
+ */
+static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
+{
+ struct irc_queue_entry *entry;
+ err_t err;
+
+ // alloc
+ if ((entry = calloc(1, sizeof(*entry))) == NULL)
+ return ERR_CALLOC;
+
+ // format
+ if ((err = irc_line_build(line, entry->line_buf)))
+ goto error;
+
+ // add CRLF
+ strcat(entry->line_buf, "\r\n");
+
+ // then re-schedule the queue
+ if ((err = irc_queue_schedule(queue)))
+ goto error;
+
+ // append to end of list
+ TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
+
+ // ok
+ return SUCCESS;
+
+error:
+ // cleanup
+ free(entry);
+
+ return err;
+}
+
+err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
+{
+ // current time
+ time_t now = time(NULL);
+
+ if (queue->timer < now + IRC_QUEUE_WINDOW) {
+ // timer is OK, send directly
+ return irc_queue_send_direct(queue, line);
+
+ } else {
+ // enqueue for later transmission
+ return irc_queue_put(queue, line);
+ }
+}
+