|
1 |
|
2 #include "line_proto.h" |
|
3 #include "log.h" |
|
4 |
|
5 #include <string.h> |
|
6 #include <stdlib.h> |
|
7 #include <assert.h> |
|
8 |
|
9 /* |
|
10 * Our state |
|
11 */ |
|
12 struct line_proto { |
|
13 /* The transport we read/write with */ |
|
14 transport_t *transport; |
|
15 |
|
16 /* The incoming/outgoing line buffer */ |
|
17 char *in_buf, *out_buf; |
|
18 |
|
19 /* Buffer size (same for both) */ |
|
20 size_t buf_len; |
|
21 |
|
22 /* Offset of trailing data in buf */ |
|
23 size_t tail_offset; |
|
24 |
|
25 /* Length of trailing data in buf, if any */ |
|
26 size_t tail_len; |
|
27 |
|
28 /* Amount of data in the out buffer */ |
|
29 size_t out_offset; |
|
30 |
|
31 /* Last error */ |
|
32 struct error_info err; |
|
33 |
|
34 /* Callback info */ |
|
35 struct line_proto_callbacks callbacks; |
|
36 void *cb_arg; |
|
37 }; |
|
38 |
|
39 /** |
|
40 * An error occured which we could not recover from; the line_proto should now be considered corrupt. |
|
41 * |
|
42 * Notify the user callback, which will probably call line_proto_release(). |
|
43 */ |
|
44 static void line_proto_set_error (struct line_proto *lp) |
|
45 { |
|
46 // copy error_info, as it might get free'd |
|
47 struct error_info err = lp->err; |
|
48 |
|
49 // trigger callback |
|
50 lp->callbacks.on_error(&err, lp->cb_arg); |
|
51 } |
|
52 |
|
53 /** |
|
54 * Our transport_callbacks::on_read handler |
|
55 */ |
|
56 static void line_proto_on_read (transport_t *transport, void *arg) |
|
57 { |
|
58 struct line_proto *lp = arg; |
|
59 char *line; |
|
60 |
|
61 (void) transport; |
|
62 |
|
63 // sanity-check |
|
64 assert(lp->tail_offset < lp->buf_len); |
|
65 |
|
66 do { |
|
67 // attempt to read a line |
|
68 if (line_proto_recv(lp, &line)) |
|
69 // faaail |
|
70 return line_proto_set_error(lp); |
|
71 |
|
72 // got a line? |
|
73 if (line) |
|
74 lp->callbacks.on_line(line, lp->cb_arg); |
|
75 |
|
76 } while (line); |
|
77 } |
|
78 |
|
79 /* |
|
80 * Signal for write |
|
81 */ |
|
82 static void line_proto_on_write (transport_t *transport, void *arg) |
|
83 { |
|
84 struct line_proto *lp = arg; |
|
85 int ret; |
|
86 |
|
87 (void) transport; |
|
88 |
|
89 // just flush |
|
90 if ((ret = line_proto_flush(lp)) < 0) |
|
91 // faaail |
|
92 return line_proto_set_error(lp); |
|
93 } |
|
94 |
|
95 // XXX: implement on_error! |
|
96 static const struct transport_callbacks line_proto_transport_callbacks = { |
|
97 .on_read = &line_proto_on_read, |
|
98 .on_write = &line_proto_on_write, |
|
99 }; |
|
100 |
|
101 err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size, |
|
102 const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err) |
|
103 { |
|
104 struct line_proto *lp; |
|
105 |
|
106 // alloc |
|
107 if ((lp = calloc(1, sizeof(*lp))) == NULL) |
|
108 return SET_ERROR(err, ERR_CALLOC); |
|
109 |
|
110 // store |
|
111 lp->transport = transport; |
|
112 lp->buf_len = buf_size; |
|
113 lp->callbacks = *callbacks; |
|
114 lp->cb_arg = cb_arg; |
|
115 |
|
116 // allocate buffers |
|
117 if ( |
|
118 (lp->in_buf = malloc(buf_size)) == NULL |
|
119 || (lp->out_buf = malloc(buf_size)) == NULL |
|
120 ) |
|
121 JUMP_SET_ERROR(err, ERR_CALLOC); |
|
122 |
|
123 // setup the transport |
|
124 transport_set_callbacks(transport, &line_proto_transport_callbacks, lp); |
|
125 |
|
126 if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE))) |
|
127 goto error; |
|
128 |
|
129 // return |
|
130 *lp_ptr = lp; |
|
131 |
|
132 return SUCCESS; |
|
133 |
|
134 error: |
|
135 // cleanup the lp |
|
136 line_proto_destroy(lp); |
|
137 |
|
138 return ERROR_CODE(err); |
|
139 } |
|
140 |
|
141 /* |
|
142 * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be |
|
143 * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned |
|
144 * (which is never a valid next-line offset). |
|
145 * |
|
146 * The given \a hint is an hint as to the offset at which to start scanning, used for incremental invocations of this |
|
147 * on the same buffer. |
|
148 * |
|
149 */ |
|
150 int _parse_line (char *buf, size_t len, size_t *hint) { |
|
151 size_t i, next = 0; |
|
152 |
|
153 // empty buffer -> nothing |
|
154 if (len == 0) |
|
155 return 0; |
|
156 |
|
157 // look for terminating '\r\n' or '\n' sequence |
|
158 for (i = *hint; i < len; i++) { |
|
159 // match this + next char? |
|
160 if (i < len - 1 && buf[i] == '\r' && buf[i + 1] == '\n') { |
|
161 next = i + 2; |
|
162 break; |
|
163 |
|
164 } else if (buf[i] == '\n') { |
|
165 next = i + 1; |
|
166 break; |
|
167 } |
|
168 } |
|
169 |
|
170 // searched the whole buffer? |
|
171 if (i >= len) { |
|
172 // do continue one char back, to keep any \r |
|
173 *hint = len - 1; |
|
174 return 0; |
|
175 } |
|
176 |
|
177 // mangle the newline off |
|
178 buf[i] = '\0'; |
|
179 |
|
180 // return offset to next line, as set in loop based on delim |
|
181 return next; |
|
182 } |
|
183 |
|
184 err_t line_proto_recv (struct line_proto *lp, char **line_ptr) |
|
185 { |
|
186 // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line |
|
187 size_t recv_offset = 0, peek_offset = 0, next_offset = 0; |
|
188 int ret; |
|
189 |
|
190 // adjust offset to beyond previous data (as will be moved next) |
|
191 recv_offset = lp->tail_len; |
|
192 |
|
193 // move trailing data from previous line to front of buffer |
|
194 if (lp->tail_offset) { |
|
195 // move to front, no-op if tail_len is zero |
|
196 memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len); |
|
197 |
|
198 // reset |
|
199 lp->tail_offset = 0; |
|
200 lp->tail_len = 0; |
|
201 } |
|
202 |
|
203 // readline loop |
|
204 do { |
|
205 // parse any line at the beginning of the buffer |
|
206 if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) { |
|
207 // store a valid *line_ptr |
|
208 *line_ptr = lp->in_buf; |
|
209 |
|
210 // exit loop and return |
|
211 break; |
|
212 } |
|
213 |
|
214 // ensure there's enough space for the rest of the line |
|
215 if (recv_offset >= lp->buf_len) |
|
216 return ERR_LINE_TOO_LONG; |
|
217 |
|
218 // otherwise, read more data |
|
219 if ((ret = transport_read(lp->transport, lp->in_buf + recv_offset, lp->buf_len - recv_offset, &lp->err)) < 0) |
|
220 return ERROR_CODE(&lp->err); |
|
221 |
|
222 // EAGAIN? |
|
223 if (ret == 0) { |
|
224 // return a NULL *line_ptr |
|
225 *line_ptr = NULL; |
|
226 break; |
|
227 } |
|
228 |
|
229 // update recv_offset |
|
230 recv_offset += ret; |
|
231 |
|
232 } while (1); |
|
233 |
|
234 // update state for next call |
|
235 lp->tail_offset = next_offset; |
|
236 lp->tail_len = recv_offset - next_offset; |
|
237 |
|
238 // ok |
|
239 return SUCCESS; |
|
240 } |
|
241 |
|
242 int line_proto_send (struct line_proto *lp, const char *line) |
|
243 { |
|
244 int ret; |
|
245 size_t len = strlen(line), ret_len; |
|
246 |
|
247 // drop line if we already have output buffered |
|
248 if (lp->out_offset) |
|
249 return -ERR_LINE_TOO_LONG; |
|
250 |
|
251 // try and write the line |
|
252 if ((ret = transport_write(lp->transport, line, len, &lp->err)) < 0) |
|
253 return -ERROR_CODE(&lp->err); |
|
254 |
|
255 // length of the sent data |
|
256 ret_len = ret; |
|
257 |
|
258 // EAGAIN or partial? |
|
259 if (ret_len < len) { |
|
260 size_t trailing = len - ret_len; |
|
261 |
|
262 // ensure it's not waaaay too long |
|
263 if (trailing > lp->buf_len) |
|
264 return -ERR_LINE_TOO_LONG; |
|
265 |
|
266 // copy remaining portion to buffer |
|
267 memcpy(lp->out_buf, line + ret_len, trailing); |
|
268 |
|
269 // update offset |
|
270 lp->out_offset = trailing; |
|
271 |
|
272 // buffered... transport should invoke on_write itself |
|
273 return 1; |
|
274 |
|
275 } else { |
|
276 // ok, no buffering needed |
|
277 return SUCCESS; |
|
278 |
|
279 } |
|
280 } |
|
281 |
|
282 int line_proto_flush (struct line_proto *lp) |
|
283 { |
|
284 int ret; |
|
285 size_t ret_len; |
|
286 |
|
287 assert(lp->out_offset); |
|
288 |
|
289 // try and write the line |
|
290 if ((ret = transport_write(lp->transport, lp->out_buf, lp->out_offset, &lp->err)) < 0) |
|
291 return -ERROR_CODE(&lp->err); |
|
292 |
|
293 ret_len = ret; |
|
294 |
|
295 // empty now? |
|
296 if (ret_len == lp->out_offset) { |
|
297 lp->out_offset = 0; |
|
298 |
|
299 return SUCCESS; |
|
300 } |
|
301 |
|
302 // partial? |
|
303 if (ret_len > 0) { |
|
304 size_t remaining = lp->out_offset - ret_len; |
|
305 |
|
306 // move the rest up |
|
307 memmove(lp->out_buf, lp->out_buf + ret_len, remaining); |
|
308 |
|
309 // update offset |
|
310 lp->out_offset = remaining; |
|
311 } |
|
312 |
|
313 // ok |
|
314 return 1; |
|
315 } |
|
316 |
|
317 const struct error_info* line_proto_error (struct line_proto *lp) |
|
318 { |
|
319 // return pointer |
|
320 return &lp->err; |
|
321 } |
|
322 |
|
323 void line_proto_destroy (struct line_proto *lp) |
|
324 { |
|
325 // free buffers |
|
326 free(lp->in_buf); |
|
327 free(lp->out_buf); |
|
328 |
|
329 // socket? |
|
330 if (lp->transport) |
|
331 transport_destroy(lp->transport); |
|
332 |
|
333 // free the state itself |
|
334 free(lp); |
|
335 } |
|
336 |