terom@90: #include "irc_queue.h" terom@90: #include "log.h" terom@90: terom@90: #include terom@90: #include terom@90: #include terom@90: terom@90: /** terom@90: * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current terom@90: * time if it's fallen behind. terom@90: */ terom@90: static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf) terom@90: { terom@90: time_t now = time(NULL); terom@90: err_t err; terom@90: int ret; terom@151: terom@151: // dump for debug output, without any newline terom@151: log_debug("%.*s", (int) (strstr(line_buf, "\r\n") - line_buf), line_buf); terom@90: terom@90: // XXX: output buffering? terom@90: if ((ret = line_proto_send(queue->lp, line_buf)) < 0) terom@90: return (err = -ret); terom@90: terom@90: // reset timer to current time if it's fallen behind terom@90: if (queue->timer < now) terom@90: queue->timer = now; terom@90: terom@90: // apply penalty terom@90: queue->timer += IRC_QUEUE_PENALTY; terom@90: terom@90: // ok terom@90: return SUCCESS; terom@90: } terom@90: terom@90: /** terom@90: * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is terom@90: * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue. terom@90: * terom@90: * If the queue is empty and no entry is given, does nothing. terom@90: */ terom@90: static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next) terom@90: { terom@90: err_t err; terom@90: terom@90: // pop the next entry? terom@90: if (next == NULL) { terom@90: // take it from the head of the list terom@90: next = TAILQ_FIRST(&queue->list); terom@90: terom@90: // nothing to do? terom@90: if (!next) terom@90: return SUCCESS; terom@90: terom@90: // and then remove it terom@90: TAILQ_REMOVE(&queue->list, next, queue_list); terom@90: } terom@90: terom@90: // send it terom@90: if ((err = irc_queue_send_buf(queue, next->line_buf))) terom@90: goto error; terom@90: terom@90: // ok, release it terom@90: free(next); terom@90: terom@90: // done terom@90: return SUCCESS; terom@90: terom@90: error: terom@90: // hmm... re-insert and return error, we can try again later terom@90: TAILQ_INSERT_HEAD(&queue->list, next, queue_list); terom@90: terom@90: return err; terom@90: } terom@90: terom@90: /** terom@90: * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is terom@90: * given, it should be the next entry to send, and not part of the queue_list. terom@90: */ terom@90: static err_t irc_queue_schedule (struct irc_queue *queue) terom@90: { terom@90: time_t now = time(NULL); terom@90: struct timeval tv; terom@90: terom@90: // calculate the timeout terom@90: int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW)); terom@90: terom@90: // setup the timeout, zero if we don't actually need to wait... terom@90: tv.tv_sec = timeout > 0 ? timeout : 0; terom@90: tv.tv_usec = 0; terom@90: terom@90: // setup the timer event terom@90: if (evtimer_add(queue->ev, &tv)) terom@90: return ERR_EVENT_ADD; terom@90: terom@90: // ok terom@90: return SUCCESS; terom@90: } terom@90: terom@90: /** terom@90: * Our timer callback terom@90: */ terom@90: static void irc_queue_timer (int fd, short what, void *arg) terom@90: { terom@90: time_t now = time(NULL); terom@90: struct irc_queue *queue = arg; terom@90: err_t err; terom@90: terom@90: (void) fd; terom@90: (void) what; terom@90: terom@90: // pump the queue until our timer is full again, or the queue is empty terom@90: while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) { terom@90: if ((err = irc_queue_pump(queue, NULL))) { terom@90: log_warn("irc_queue_pump: %s", error_name(err)); terom@90: break; terom@90: } terom@90: } terom@90: terom@90: // reschedule if needed terom@90: if (!TAILQ_EMPTY(&queue->list)) { terom@90: if ((err = irc_queue_schedule(queue))) { terom@90: log_err(err, "irc_queue_scheulde"); terom@90: } terom@90: } terom@90: } terom@90: terom@155: err_t irc_queue_create (struct irc_queue **queue_ptr, struct event_base *ev_base, struct line_proto *lp, struct error_info *err) terom@90: { terom@90: struct irc_queue *queue; terom@90: terom@90: // alloc terom@90: if ((queue = calloc(1, sizeof(*queue))) == NULL) terom@90: return SET_ERROR(err, ERR_CALLOC); terom@90: terom@90: // create the timer event terom@155: if ((queue->ev = evtimer_new(ev_base, &irc_queue_timer, queue)) == NULL) terom@90: JUMP_SET_ERROR(err, ERR_EVENT_NEW); terom@90: terom@90: // initialize terom@90: queue->lp = lp; terom@90: queue->timer = time(NULL); terom@90: TAILQ_INIT(&queue->list); terom@90: terom@90: // ok terom@90: *queue_ptr = queue; terom@90: terom@90: return SUCCESS; terom@90: terom@90: error: terom@90: // cleanup terom@90: free(queue); terom@90: terom@90: return ERROR_CODE(err); terom@90: } terom@90: terom@90: /** terom@90: * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission. terom@90: */ terom@90: static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line) terom@90: { terom@90: char line_buf[IRC_LINE_MAX + 2]; terom@90: err_t err; terom@90: terom@90: // format terom@90: if ((err = irc_line_build(line, line_buf))) terom@90: return err; terom@90: terom@90: // add CRLF terom@90: strcat(line_buf, "\r\n"); terom@90: terom@90: // send terom@90: // XXX: handle send-buffer-full by enqueuing it after all terom@90: return irc_queue_send_buf(queue, line_buf); terom@90: } terom@90: terom@90: /** terom@90: * Enqueue a irc_line onto the queue's list, and schedule the timer for execution terom@90: */ terom@90: static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line) terom@90: { terom@90: struct irc_queue_entry *entry; terom@90: err_t err; terom@90: terom@90: // alloc terom@90: if ((entry = calloc(1, sizeof(*entry))) == NULL) terom@90: return ERR_CALLOC; terom@90: terom@90: // format terom@90: if ((err = irc_line_build(line, entry->line_buf))) terom@90: goto error; terom@90: terom@90: // add CRLF terom@90: strcat(entry->line_buf, "\r\n"); terom@90: terom@90: // then re-schedule the queue terom@90: if ((err = irc_queue_schedule(queue))) terom@90: goto error; terom@90: terom@90: // append to end of list terom@90: TAILQ_INSERT_TAIL(&queue->list, entry, queue_list); terom@90: terom@90: // ok terom@90: return SUCCESS; terom@90: terom@90: error: terom@90: // cleanup terom@90: free(entry); terom@90: terom@90: return err; terom@90: } terom@90: terom@90: err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line) terom@90: { terom@90: // current time terom@90: time_t now = time(NULL); terom@90: terom@90: if (queue->timer < now + IRC_QUEUE_WINDOW) { terom@90: // timer is OK, send directly terom@90: return irc_queue_send_direct(queue, line); terom@90: terom@90: } else { terom@90: // enqueue for later transmission terom@90: return irc_queue_put(queue, line); terom@90: } terom@90: } terom@90: terom@91: void irc_queue_destroy (struct irc_queue *queue) terom@91: { terom@91: struct irc_queue_entry *entry, *next; terom@91: terom@91: // free all entries terom@91: for (entry = TAILQ_FIRST(&queue->list); entry; entry = next) { terom@91: next = TAILQ_NEXT(entry, queue_list); terom@91: free(entry); terom@91: } terom@91: terom@91: // the event terom@91: event_free(queue->ev); terom@91: terom@91: // the queue itself terom@91: free(queue); terom@91: } terom@91: