#include "irc_queue.h"
#include "log.h"
#include <stdlib.h>
#include <string.h>
#include <time.h>
/**
* Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
* time if it's fallen behind.
*/
static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
{
time_t now = time(NULL);
err_t err;
int ret;
// dump for debug output, without any newline
log_debug("%.*s", (int) (strstr(line_buf, "\r\n") - line_buf), line_buf);
// XXX: output buffering?
if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
return (err = -ret);
// reset timer to current time if it's fallen behind
if (queue->timer < now)
queue->timer = now;
// apply penalty
queue->timer += IRC_QUEUE_PENALTY;
// ok
return SUCCESS;
}
/**
* Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
* assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
*
* If the queue is empty and no entry is given, does nothing.
*/
static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
{
err_t err;
// pop the next entry?
if (next == NULL) {
// take it from the head of the list
next = TAILQ_FIRST(&queue->list);
// nothing to do?
if (!next)
return SUCCESS;
// and then remove it
TAILQ_REMOVE(&queue->list, next, queue_list);
}
// send it
if ((err = irc_queue_send_buf(queue, next->line_buf)))
goto error;
// ok, release it
free(next);
// done
return SUCCESS;
error:
// hmm... re-insert and return error, we can try again later
TAILQ_INSERT_HEAD(&queue->list, next, queue_list);
return err;
}
/**
* Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
* given, it should be the next entry to send, and not part of the queue_list.
*/
static err_t irc_queue_schedule (struct irc_queue *queue)
{
time_t now = time(NULL);
struct timeval tv;
// calculate the timeout
int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
// setup the timeout, zero if we don't actually need to wait...
tv.tv_sec = timeout > 0 ? timeout : 0;
tv.tv_usec = 0;
// setup the timer event
if (evtimer_add(queue->ev, &tv))
return ERR_EVENT_ADD;
// ok
return SUCCESS;
}
/**
* Our timer callback
*/
static void irc_queue_timer (int fd, short what, void *arg)
{
time_t now = time(NULL);
struct irc_queue *queue = arg;
err_t err;
(void) fd;
(void) what;
// pump the queue until our timer is full again, or the queue is empty
while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
if ((err = irc_queue_pump(queue, NULL))) {
log_warn("irc_queue_pump: %s", error_name(err));
break;
}
}
// reschedule if needed
if (!TAILQ_EMPTY(&queue->list)) {
if ((err = irc_queue_schedule(queue))) {
log_err(err, "irc_queue_scheulde");
}
}
}
err_t irc_queue_create (struct irc_queue **queue_ptr, struct event_base *ev_base, struct line_proto *lp, error_t *err)
{
struct irc_queue *queue;
// alloc
if ((queue = calloc(1, sizeof(*queue))) == NULL)
return SET_ERROR(err, ERR_CALLOC);
// create the timer event
if ((queue->ev = evtimer_new(ev_base, &irc_queue_timer, queue)) == NULL)
JUMP_SET_ERROR(err, ERR_EVENT_NEW);
// initialize
queue->lp = lp;
queue->timer = time(NULL);
TAILQ_INIT(&queue->list);
// ok
*queue_ptr = queue;
return SUCCESS;
error:
// cleanup
free(queue);
return ERROR_CODE(err);
}
/**
* Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
*/
static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
{
char line_buf[IRC_LINE_MAX + 2];
err_t err;
// format
if ((err = irc_line_build(line, line_buf)))
return err;
// add CRLF
strcat(line_buf, "\r\n");
// send
// XXX: handle send-buffer-full by enqueuing it after all
return irc_queue_send_buf(queue, line_buf);
}
/**
* Enqueue a irc_line onto the queue's list, and schedule the timer for execution
*/
static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
{
struct irc_queue_entry *entry;
err_t err;
// alloc
if ((entry = calloc(1, sizeof(*entry))) == NULL)
return ERR_CALLOC;
// format
if ((err = irc_line_build(line, entry->line_buf)))
goto error;
// add CRLF
strcat(entry->line_buf, "\r\n");
// then re-schedule the queue
if ((err = irc_queue_schedule(queue)))
goto error;
// append to end of list
TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
// ok
return SUCCESS;
error:
// cleanup
free(entry);
return err;
}
err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
{
// current time
time_t now = time(NULL);
if (queue->timer < now + IRC_QUEUE_WINDOW) {
// timer is OK, send directly
return irc_queue_send_direct(queue, line);
} else {
// enqueue for later transmission
return irc_queue_put(queue, line);
}
}
void irc_queue_destroy (struct irc_queue *queue)
{
struct irc_queue_entry *entry, *next;
// free all entries
for (entry = TAILQ_FIRST(&queue->list); entry; entry = next) {
next = TAILQ_NEXT(entry, queue_list);
free(entry);
}
// the event
event_free(queue->ev);
// the queue itself
free(queue);
}