src/irc_queue.c
author Tero Marttila <terom@fixme.fi>
Wed, 27 May 2009 23:57:48 +0300
branchnew-lib-errors
changeset 217 7728d6ec3abf
parent 155 c59d3eaff0fb
permissions -rw-r--r--
nexus.c compiles
#include "irc_queue.h"
#include "log.h"

#include <stdlib.h>
#include <string.h>
#include <time.h>

/**
 * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
 * time if it's fallen behind.
 */
static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
{
    time_t now = time(NULL);
    err_t err;
    int ret;
    
    // dump for debug output, without any newline
    log_debug("%.*s", (int) (strstr(line_buf, "\r\n") - line_buf), line_buf);
    
    // XXX: output buffering?
    if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
        return (err = -ret);
    
    // reset timer to current time if it's fallen behind
    if (queue->timer < now)
        queue->timer = now;

    // apply penalty
    queue->timer += IRC_QUEUE_PENALTY;
    
    // ok
    return SUCCESS;
}

/**
 * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
 * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
 *
 * If the queue is empty and no entry is given, does nothing.
 */
static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
{
    err_t err;

    // pop the next entry?
    if (next == NULL) {
        // take it from the head of the list
        next = TAILQ_FIRST(&queue->list);
    
        // nothing to do?
        if (!next)
            return SUCCESS;
        
        // and then remove it
        TAILQ_REMOVE(&queue->list, next, queue_list);
    }

    // send it
    if ((err = irc_queue_send_buf(queue, next->line_buf)))
        goto error;
    
    // ok, release it
    free(next);
    
    // done
    return SUCCESS;

error:
    // hmm... re-insert and return error, we can try again later
    TAILQ_INSERT_HEAD(&queue->list, next, queue_list);

    return err;    
}

/**
 * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
 * given, it should be the next entry to send, and not part of the queue_list.
 */
static err_t irc_queue_schedule (struct irc_queue *queue)
{
    time_t now = time(NULL);
    struct timeval tv;
    
    // calculate the timeout
    int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
    
    // setup the timeout, zero if we don't actually need to wait...
    tv.tv_sec = timeout > 0 ? timeout : 0;
    tv.tv_usec = 0;

    // setup the timer event
    if (evtimer_add(queue->ev, &tv))
        return ERR_EVENT_ADD;

    // ok
    return SUCCESS;
}

/**
 * Our timer callback
 */
static void irc_queue_timer (int fd, short what, void *arg)
{
    time_t now = time(NULL);
    struct irc_queue *queue = arg;
    err_t err;

    (void) fd;
    (void) what;

    // pump the queue until our timer is full again, or the queue is empty
    while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
        if ((err = irc_queue_pump(queue, NULL))) {
            log_warn("irc_queue_pump: %s", error_name(err));
            break;
        }
    }

    // reschedule if needed
    if (!TAILQ_EMPTY(&queue->list)) {
        if ((err = irc_queue_schedule(queue))) {
            log_err(err, "irc_queue_scheulde");
        }
    }
}

err_t irc_queue_create (struct irc_queue **queue_ptr, struct event_base *ev_base, struct line_proto *lp, error_t *err)
{
    struct irc_queue *queue;

    // alloc
    if ((queue = calloc(1, sizeof(*queue))) == NULL)
        return SET_ERROR(err, ERR_CALLOC);

    // create the timer event
    if ((queue->ev = evtimer_new(ev_base, &irc_queue_timer, queue)) == NULL)
        JUMP_SET_ERROR(err, ERR_EVENT_NEW);

    // initialize
    queue->lp = lp;
    queue->timer = time(NULL);
    TAILQ_INIT(&queue->list); 

    // ok
    *queue_ptr = queue;

    return SUCCESS;

error:
    // cleanup
    free(queue);

    return ERROR_CODE(err);    
}

/**
 * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
 */
static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
{
    char line_buf[IRC_LINE_MAX + 2];
    err_t err;

    // format
    if ((err = irc_line_build(line, line_buf)))
        return err;
    
    // add CRLF
    strcat(line_buf, "\r\n");

    // send
    // XXX: handle send-buffer-full by enqueuing it after all
    return irc_queue_send_buf(queue, line_buf);
}

/**
 * Enqueue a irc_line onto the queue's list, and schedule the timer for execution
 */
static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
{
    struct irc_queue_entry *entry;
    err_t err;

    // alloc
    if ((entry = calloc(1, sizeof(*entry))) == NULL)
        return ERR_CALLOC;

    // format
    if ((err = irc_line_build(line, entry->line_buf)))
        goto error;

    // add CRLF
    strcat(entry->line_buf, "\r\n");
   
    // then re-schedule the queue
    if ((err = irc_queue_schedule(queue)))
        goto error;

    // append to end of list
    TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
 
    // ok
    return SUCCESS;

error:
    // cleanup
    free(entry);

    return err;    
}

err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
{
    // current time
    time_t now = time(NULL);

    if (queue->timer < now + IRC_QUEUE_WINDOW) {
        // timer is OK, send directly
        return irc_queue_send_direct(queue, line);
    
    } else {
        // enqueue for later transmission
        return irc_queue_put(queue, line);
    }
}

void irc_queue_destroy (struct irc_queue *queue)
{
    struct irc_queue_entry *entry, *next;

    // free all entries
    for (entry = TAILQ_FIRST(&queue->list); entry; entry = next) {
        next = TAILQ_NEXT(entry, queue_list);
        free(entry);
    }

    // the event
    event_free(queue->ev);
    
    // the queue itself
    free(queue);
}