|
1 #include "irc_queue.h" |
|
2 #include "log.h" |
|
3 |
|
4 // XXX: for ev_base |
|
5 #include "sock_internal.h" |
|
6 |
|
7 #include <stdlib.h> |
|
8 #include <string.h> |
|
9 #include <time.h> |
|
10 |
|
11 /** |
|
12 * Send a formatted line on the line_proto, and apply the IRC_QUEUE_PENALTY to the timer, resetting it to the current |
|
13 * time if it's fallen behind. |
|
14 */ |
|
15 static err_t irc_queue_send_buf (struct irc_queue *queue, const char *line_buf) |
|
16 { |
|
17 time_t now = time(NULL); |
|
18 err_t err; |
|
19 int ret; |
|
20 |
|
21 // XXX: output buffering? |
|
22 if ((ret = line_proto_send(queue->lp, line_buf)) < 0) |
|
23 return (err = -ret); |
|
24 |
|
25 // reset timer to current time if it's fallen behind |
|
26 if (queue->timer < now) |
|
27 queue->timer = now; |
|
28 |
|
29 // apply penalty |
|
30 queue->timer += IRC_QUEUE_PENALTY; |
|
31 |
|
32 // ok |
|
33 return SUCCESS; |
|
34 } |
|
35 |
|
36 /** |
|
37 * Pump the queue, sending the next message. If it is given explicitly, then the given message is "pumped", and it is |
|
38 * assumed to not be in the queue_list. Otherwise, the next entry is taken from the queue. |
|
39 * |
|
40 * If the queue is empty and no entry is given, does nothing. |
|
41 */ |
|
42 static err_t irc_queue_pump (struct irc_queue *queue, struct irc_queue_entry *next) |
|
43 { |
|
44 err_t err; |
|
45 |
|
46 // pop the next entry? |
|
47 if (next == NULL) { |
|
48 // take it from the head of the list |
|
49 next = TAILQ_FIRST(&queue->list); |
|
50 |
|
51 // nothing to do? |
|
52 if (!next) |
|
53 return SUCCESS; |
|
54 |
|
55 // and then remove it |
|
56 TAILQ_REMOVE(&queue->list, next, queue_list); |
|
57 } |
|
58 |
|
59 // send it |
|
60 if ((err = irc_queue_send_buf(queue, next->line_buf))) |
|
61 goto error; |
|
62 |
|
63 // ok, release it |
|
64 free(next); |
|
65 |
|
66 // done |
|
67 return SUCCESS; |
|
68 |
|
69 error: |
|
70 // hmm... re-insert and return error, we can try again later |
|
71 TAILQ_INSERT_HEAD(&queue->list, next, queue_list); |
|
72 |
|
73 return err; |
|
74 } |
|
75 |
|
76 /** |
|
77 * Schedule the queue's timer for execution once the timer has expired, or immediately if it's not full. If next is |
|
78 * given, it should be the next entry to send, and not part of the queue_list. |
|
79 */ |
|
80 static err_t irc_queue_schedule (struct irc_queue *queue) |
|
81 { |
|
82 time_t now = time(NULL); |
|
83 struct timeval tv; |
|
84 |
|
85 // calculate the timeout |
|
86 int timeout = (queue->timer - (now + IRC_QUEUE_WINDOW)); |
|
87 |
|
88 // setup the timeout, zero if we don't actually need to wait... |
|
89 tv.tv_sec = timeout > 0 ? timeout : 0; |
|
90 tv.tv_usec = 0; |
|
91 |
|
92 // setup the timer event |
|
93 if (evtimer_add(queue->ev, &tv)) |
|
94 return ERR_EVENT_ADD; |
|
95 |
|
96 // ok |
|
97 return SUCCESS; |
|
98 } |
|
99 |
|
100 /** |
|
101 * Our timer callback |
|
102 */ |
|
103 static void irc_queue_timer (int fd, short what, void *arg) |
|
104 { |
|
105 time_t now = time(NULL); |
|
106 struct irc_queue *queue = arg; |
|
107 err_t err; |
|
108 |
|
109 (void) fd; |
|
110 (void) what; |
|
111 |
|
112 // pump the queue until our timer is full again, or the queue is empty |
|
113 while (queue->timer <= now + IRC_QUEUE_WINDOW && !TAILQ_EMPTY(&queue->list)) { |
|
114 if ((err = irc_queue_pump(queue, NULL))) { |
|
115 log_warn("irc_queue_pump: %s", error_name(err)); |
|
116 break; |
|
117 } |
|
118 } |
|
119 |
|
120 // reschedule if needed |
|
121 if (!TAILQ_EMPTY(&queue->list)) { |
|
122 if ((err = irc_queue_schedule(queue))) { |
|
123 log_err(err, "irc_queue_scheulde"); |
|
124 } |
|
125 } |
|
126 } |
|
127 |
|
128 err_t irc_queue_create (struct irc_queue **queue_ptr, struct line_proto *lp, struct error_info *err) |
|
129 { |
|
130 struct irc_queue *queue; |
|
131 |
|
132 // alloc |
|
133 if ((queue = calloc(1, sizeof(*queue))) == NULL) |
|
134 return SET_ERROR(err, ERR_CALLOC); |
|
135 |
|
136 // create the timer event |
|
137 // XXX: using the sock module ev_base |
|
138 if ((queue->ev = evtimer_new(_sock_stream_ctx.ev_base, &irc_queue_timer, queue)) == NULL) |
|
139 JUMP_SET_ERROR(err, ERR_EVENT_NEW); |
|
140 |
|
141 // initialize |
|
142 queue->lp = lp; |
|
143 queue->timer = time(NULL); |
|
144 TAILQ_INIT(&queue->list); |
|
145 |
|
146 // ok |
|
147 *queue_ptr = queue; |
|
148 |
|
149 return SUCCESS; |
|
150 |
|
151 error: |
|
152 // cleanup |
|
153 free(queue); |
|
154 |
|
155 return ERROR_CODE(err); |
|
156 } |
|
157 |
|
158 /** |
|
159 * Attempt to send a irc_line directly on the queue's line_proto, otherwise enqueueing it for later transmission. |
|
160 */ |
|
161 static err_t irc_queue_send_direct (struct irc_queue *queue, const struct irc_line *line) |
|
162 { |
|
163 char line_buf[IRC_LINE_MAX + 2]; |
|
164 err_t err; |
|
165 |
|
166 // format |
|
167 if ((err = irc_line_build(line, line_buf))) |
|
168 return err; |
|
169 |
|
170 // add CRLF |
|
171 strcat(line_buf, "\r\n"); |
|
172 |
|
173 // send |
|
174 // XXX: handle send-buffer-full by enqueuing it after all |
|
175 return irc_queue_send_buf(queue, line_buf); |
|
176 } |
|
177 |
|
178 /** |
|
179 * Enqueue a irc_line onto the queue's list, and schedule the timer for execution |
|
180 */ |
|
181 static err_t irc_queue_put (struct irc_queue *queue, const struct irc_line *line) |
|
182 { |
|
183 struct irc_queue_entry *entry; |
|
184 err_t err; |
|
185 |
|
186 // alloc |
|
187 if ((entry = calloc(1, sizeof(*entry))) == NULL) |
|
188 return ERR_CALLOC; |
|
189 |
|
190 // format |
|
191 if ((err = irc_line_build(line, entry->line_buf))) |
|
192 goto error; |
|
193 |
|
194 // add CRLF |
|
195 strcat(entry->line_buf, "\r\n"); |
|
196 |
|
197 // then re-schedule the queue |
|
198 if ((err = irc_queue_schedule(queue))) |
|
199 goto error; |
|
200 |
|
201 // append to end of list |
|
202 TAILQ_INSERT_TAIL(&queue->list, entry, queue_list); |
|
203 |
|
204 // ok |
|
205 return SUCCESS; |
|
206 |
|
207 error: |
|
208 // cleanup |
|
209 free(entry); |
|
210 |
|
211 return err; |
|
212 } |
|
213 |
|
214 err_t irc_queue_process (struct irc_queue *queue, const struct irc_line *line) |
|
215 { |
|
216 // current time |
|
217 time_t now = time(NULL); |
|
218 |
|
219 if (queue->timer < now + IRC_QUEUE_WINDOW) { |
|
220 // timer is OK, send directly |
|
221 return irc_queue_send_direct(queue, line); |
|
222 |
|
223 } else { |
|
224 // enqueue for later transmission |
|
225 return irc_queue_put(queue, line); |
|
226 } |
|
227 } |
|
228 |