src/irc_queue.c
changeset 90 9d489b1039b2
child 91 bca23cbe1dce
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/irc_queue.c	Mon Mar 30 16:31:24 2009 +0300
@@ -0,0 +1,228 @@
+#include "irc_queue.h"
+#include "log.h"
+
+// XXX: for ev_base
+#include "sock_internal.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+/**
+ * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
+ * time if it's fallen behind.
+ */
+static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
+{
+    time_t now = time(NULL);
+    err_t err;
+    int ret;
+    
+    // XXX: output buffering?
+    if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
+        return (err = -ret);
+    
+    // reset timer to current time if it's fallen behind
+    if (queue->timer < now)
+        queue->timer = now;
+
+    // apply penalty
+    queue->timer += IRC_QUEUE_PENALTY;
+    
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
+ * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
+ *
+ * If the queue is empty and no entry is given, does nothing.
+ */
+static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
+{
+    err_t err;
+
+    // pop the next entry?
+    if (next == NULL) {
+        // take it from the head of the list
+        next = TAILQ_FIRST(&queue->list);
+    
+        // nothing to do?
+        if (!next)
+            return SUCCESS;
+        
+        // and then remove it
+        TAILQ_REMOVE(&queue->list, next, queue_list);
+    }
+
+    // send it
+    if ((err = irc_queue_send_buf(queue, next->line_buf)))
+        goto error;
+    
+    // ok, release it
+    free(next);
+    
+    // done
+    return SUCCESS;
+
+error:
+    // hmm... re-insert and return error, we can try again later
+    TAILQ_INSERT_HEAD(&queue->list, next, queue_list);
+
+    return err;    
+}
+
+/**
+ * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
+ * given, it should be the next entry to send, and not part of the queue_list.
+ */
+static err_t irc_queue_schedule (struct irc_queue *queue)
+{
+    time_t now = time(NULL);
+    struct timeval tv;
+    
+    // calculate the timeout
+    int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
+    
+    // setup the timeout, zero if we don't actually need to wait...
+    tv.tv_sec = timeout > 0 ? timeout : 0;
+    tv.tv_usec = 0;
+
+    // setup the timer event
+    if (evtimer_add(queue->ev, &tv))
+        return ERR_EVENT_ADD;
+
+    // ok
+    return SUCCESS;
+}
+
+/**
+ * Our timer callback
+ */
+static void irc_queue_timer (int fd, short what, void *arg)
+{
+    time_t now = time(NULL);
+    struct irc_queue *queue = arg;
+    err_t err;
+
+    (void) fd;
+    (void) what;
+
+    // pump the queue until our timer is full again, or the queue is empty
+    while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
+        if ((err = irc_queue_pump(queue, NULL))) {
+            log_warn("irc_queue_pump: %s", error_name(err));
+            break;
+        }
+    }
+
+    // reschedule if needed
+    if (!TAILQ_EMPTY(&queue->list)) {
+        if ((err = irc_queue_schedule(queue))) {
+            log_err(err, "irc_queue_scheulde");
+        }
+    }
+}
+
+err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err)
+{
+    struct irc_queue *queue;
+
+    // alloc
+    if ((queue = calloc(1, sizeof(*queue))) == NULL)
+        return SET_ERROR(err, ERR_CALLOC);
+
+    // create the timer event
+    // XXX: using the sock module ev_base
+    if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL)
+        JUMP_SET_ERROR(err, ERR_EVENT_NEW);
+
+    // initialize
+    queue->lp = lp;
+    queue->timer = time(NULL);
+    TAILQ_INIT(&queue->list); 
+
+    // ok
+    *queue_ptr = queue;
+
+    return SUCCESS;
+
+error:
+    // cleanup
+    free(queue);
+
+    return ERROR_CODE(err);    
+}
+
+/**
+ * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
+ */
+static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
+{
+    char line_buf[IRC_LINE_MAX + 2];
+    err_t err;
+
+    // format
+    if ((err = irc_line_build(line, line_buf)))
+        return err;
+    
+    // add CRLF
+    strcat(line_buf, "\r\n");
+
+    // send
+    // XXX: handle send-buffer-full by enqueuing it after all
+    return irc_queue_send_buf(queue, line_buf);
+}
+
+/**
+ * Enqueue a irc_line onto the queue's list, and schedule the timer for execution
+ */
+static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
+{
+    struct irc_queue_entry *entry;
+    err_t err;
+
+    // alloc
+    if ((entry = calloc(1, sizeof(*entry))) == NULL)
+        return ERR_CALLOC;
+
+    // format
+    if ((err = irc_line_build(line, entry->line_buf)))
+        goto error;
+
+    // add CRLF
+    strcat(entry->line_buf, "\r\n");
+   
+    // then re-schedule the queue
+    if ((err = irc_queue_schedule(queue)))
+        goto error;
+
+    // append to end of list
+    TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
+ 
+    // ok
+    return SUCCESS;
+
+error:
+    // cleanup
+    free(entry);
+
+    return err;    
+}
+
+err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
+{
+    // current time
+    time_t now = time(NULL);
+
+    if (queue->timer < now + IRC_QUEUE_WINDOW) {
+        // timer is OK, send directly
+        return irc_queue_send_direct(queue, line);
+    
+    } else {
+        // enqueue for later transmission
+        return irc_queue_put(queue, line);
+    }
+}
+