src/irc_queue.c
changeset 90 9d489b1039b2
child 91 bca23cbe1dce
equal deleted inserted replaced
89:68345a9b99a3 90:9d489b1039b2
       
     1 #include "irc_queue.h"
       
     2 #include "log.h"
       
     3 
       
     4 // XXX: for ev_base
       
     5 #include "sock_internal.h"
       
     6 
       
     7 #include <stdlib.h>
       
     8 #include <string.h>
       
     9 #include <time.h>
       
    10 
       
    11 /**
       
    12  * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current
       
    13  * time if it's fallen behind.
       
    14  */
       
    15 static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf)
       
    16 {
       
    17     time_t now = time(NULL);
       
    18     err_t err;
       
    19     int ret;
       
    20     
       
    21     // XXX: output buffering?
       
    22     if ((ret = line_proto_send(queue->lp, line_buf)) < 0)
       
    23         return (err = -ret);
       
    24     
       
    25     // reset timer to current time if it's fallen behind
       
    26     if (queue->timer < now)
       
    27         queue->timer = now;
       
    28 
       
    29     // apply penalty
       
    30     queue->timer += IRC_QUEUE_PENALTY;
       
    31     
       
    32     // ok
       
    33     return SUCCESS;
       
    34 }
       
    35 
       
    36 /**
       
    37  * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is
       
    38  * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue.
       
    39  *
       
    40  * If the queue is empty and no entry is given, does nothing.
       
    41  */
       
    42 static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next)
       
    43 {
       
    44     err_t err;
       
    45 
       
    46     // pop the next entry?
       
    47     if (next == NULL) {
       
    48         // take it from the head of the list
       
    49         next = TAILQ_FIRST(&queue->list);
       
    50     
       
    51         // nothing to do?
       
    52         if (!next)
       
    53             return SUCCESS;
       
    54         
       
    55         // and then remove it
       
    56         TAILQ_REMOVE(&queue->list, next, queue_list);
       
    57     }
       
    58 
       
    59     // send it
       
    60     if ((err = irc_queue_send_buf(queue, next->line_buf)))
       
    61         goto error;
       
    62     
       
    63     // ok, release it
       
    64     free(next);
       
    65     
       
    66     // done
       
    67     return SUCCESS;
       
    68 
       
    69 error:
       
    70     // hmm... re-insert and return error, we can try again later
       
    71     TAILQ_INSERT_HEAD(&queue->list, next, queue_list);
       
    72 
       
    73     return err;    
       
    74 }
       
    75 
       
    76 /**
       
    77  * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is
       
    78  * given, it should be the next entry to send, and not part of the queue_list.
       
    79  */
       
    80 static err_t irc_queue_schedule (struct irc_queue *queue)
       
    81 {
       
    82     time_t now = time(NULL);
       
    83     struct timeval tv;
       
    84     
       
    85     // calculate the timeout
       
    86     int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW));
       
    87     
       
    88     // setup the timeout, zero if we don't actually need to wait...
       
    89     tv.tv_sec = timeout > 0 ? timeout : 0;
       
    90     tv.tv_usec = 0;
       
    91 
       
    92     // setup the timer event
       
    93     if (evtimer_add(queue->ev, &tv))
       
    94         return ERR_EVENT_ADD;
       
    95 
       
    96     // ok
       
    97     return SUCCESS;
       
    98 }
       
    99 
       
   100 /**
       
   101  * Our timer callback
       
   102  */
       
   103 static void irc_queue_timer (int fd, short what, void *arg)
       
   104 {
       
   105     time_t now = time(NULL);
       
   106     struct irc_queue *queue = arg;
       
   107     err_t err;
       
   108 
       
   109     (void) fd;
       
   110     (void) what;
       
   111 
       
   112     // pump the queue until our timer is full again, or the queue is empty
       
   113     while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) {
       
   114         if ((err = irc_queue_pump(queue, NULL))) {
       
   115             log_warn("irc_queue_pump: %s", error_name(err));
       
   116             break;
       
   117         }
       
   118     }
       
   119 
       
   120     // reschedule if needed
       
   121     if (!TAILQ_EMPTY(&queue->list)) {
       
   122         if ((err = irc_queue_schedule(queue))) {
       
   123             log_err(err, "irc_queue_scheulde");
       
   124         }
       
   125     }
       
   126 }
       
   127 
       
   128 err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err)
       
   129 {
       
   130     struct irc_queue *queue;
       
   131 
       
   132     // alloc
       
   133     if ((queue = calloc(1, sizeof(*queue))) == NULL)
       
   134         return SET_ERROR(err, ERR_CALLOC);
       
   135 
       
   136     // create the timer event
       
   137     // XXX: using the sock module ev_base
       
   138     if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL)
       
   139         JUMP_SET_ERROR(err, ERR_EVENT_NEW);
       
   140 
       
   141     // initialize
       
   142     queue->lp = lp;
       
   143     queue->timer = time(NULL);
       
   144     TAILQ_INIT(&queue->list); 
       
   145 
       
   146     // ok
       
   147     *queue_ptr = queue;
       
   148 
       
   149     return SUCCESS;
       
   150 
       
   151 error:
       
   152     // cleanup
       
   153     free(queue);
       
   154 
       
   155     return ERROR_CODE(err);    
       
   156 }
       
   157 
       
   158 /**
       
   159  * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission.
       
   160  */
       
   161 static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line)
       
   162 {
       
   163     char line_buf[IRC_LINE_MAX + 2];
       
   164     err_t err;
       
   165 
       
   166     // format
       
   167     if ((err = irc_line_build(line, line_buf)))
       
   168         return err;
       
   169     
       
   170     // add CRLF
       
   171     strcat(line_buf, "\r\n");
       
   172 
       
   173     // send
       
   174     // XXX: handle send-buffer-full by enqueuing it after all
       
   175     return irc_queue_send_buf(queue, line_buf);
       
   176 }
       
   177 
       
   178 /**
       
   179  * Enqueue a irc_line onto the queue's list, and schedule the timer for execution
       
   180  */
       
   181 static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line)
       
   182 {
       
   183     struct irc_queue_entry *entry;
       
   184     err_t err;
       
   185 
       
   186     // alloc
       
   187     if ((entry = calloc(1, sizeof(*entry))) == NULL)
       
   188         return ERR_CALLOC;
       
   189 
       
   190     // format
       
   191     if ((err = irc_line_build(line, entry->line_buf)))
       
   192         goto error;
       
   193 
       
   194     // add CRLF
       
   195     strcat(entry->line_buf, "\r\n");
       
   196    
       
   197     // then re-schedule the queue
       
   198     if ((err = irc_queue_schedule(queue)))
       
   199         goto error;
       
   200 
       
   201     // append to end of list
       
   202     TAILQ_INSERT_TAIL(&queue->list, entry, queue_list);
       
   203  
       
   204     // ok
       
   205     return SUCCESS;
       
   206 
       
   207 error:
       
   208     // cleanup
       
   209     free(entry);
       
   210 
       
   211     return err;    
       
   212 }
       
   213 
       
   214 err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line)
       
   215 {
       
   216     // current time
       
   217     time_t now = time(NULL);
       
   218 
       
   219     if (queue->timer < now + IRC_QUEUE_WINDOW) {
       
   220         // timer is OK, send directly
       
   221         return irc_queue_send_direct(queue, line);
       
   222     
       
   223     } else {
       
   224         // enqueue for later transmission
       
   225         return irc_queue_put(queue, line);
       
   226     }
       
   227 }
       
   228