12 */ |
12 */ |
13 struct line_proto { |
13 struct line_proto { |
14 /* The sock_stream we read/write with */ |
14 /* The sock_stream we read/write with */ |
15 struct sock_stream *sock; |
15 struct sock_stream *sock; |
16 |
16 |
17 /* The incoming line buffer */ |
17 /* The incoming/outgoing line buffer */ |
18 char *buf; |
18 char *in_buf, *out_buf; |
19 |
19 |
20 /* Incoming buffer size */ |
20 /* Buffer size (same for both) */ |
21 size_t buf_len; |
21 size_t buf_len; |
22 |
22 |
23 /* Offset of trailing data in buf */ |
23 /* Offset of trailing data in buf */ |
24 size_t tail_offset; |
24 size_t tail_offset; |
25 |
25 |
26 /* Length of trailing data in buf, if any */ |
26 /* Length of trailing data in buf, if any */ |
27 size_t tail_len; |
27 size_t tail_len; |
|
28 |
|
29 /* Amount of data in the out buffer */ |
|
30 size_t out_offset; |
28 |
31 |
29 /* Last error */ |
32 /* Last error */ |
30 struct error_info err; |
33 struct error_info err; |
31 |
34 |
32 /* Callback info */ |
35 /* Callback info */ |
48 // sanity-check |
51 // sanity-check |
49 assert(lp->tail_offset < lp->buf_len); |
52 assert(lp->tail_offset < lp->buf_len); |
50 |
53 |
51 do { |
54 do { |
52 // attempt to read a line |
55 // attempt to read a line |
53 if (line_proto_read(lp, &line)) |
56 if (line_proto_recv(lp, &line)) |
54 // XXX: fail |
57 // XXX: fail |
55 errx(1, "line_proto_read: %s", error_msg(&lp->err)); |
58 errx(1, "line_proto_recv: %s", error_msg(&lp->err)); |
56 |
59 |
57 // got a line? |
60 // got a line? |
58 if (line) |
61 if (line) |
59 lp->cb_read(line, lp->cb_arg); |
62 lp->cb_read(line, lp->cb_arg); |
60 |
63 |
64 if (line_proto_schedule_events(lp, EV_READ)) |
67 if (line_proto_schedule_events(lp, EV_READ)) |
65 errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err)); |
68 errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err)); |
66 } |
69 } |
67 |
70 |
68 /* |
71 /* |
|
72 * Signal for write |
|
73 */ |
|
74 static void line_proto_on_write (struct sock_stream *sock, void *arg) |
|
75 { |
|
76 struct line_proto *lp = arg; |
|
77 err_t err; |
|
78 |
|
79 // just flush |
|
80 if ((err = line_proto_flush(lp)) < 0) |
|
81 errx(1, "line_proto_flush: %s", error_name(err)); |
|
82 } |
|
83 |
|
84 /* |
69 * Schedule our sock_stream callback |
85 * Schedule our sock_stream callback |
70 */ |
86 */ |
71 static err_t line_proto_schedule_events (struct line_proto *lp, short what) |
87 static err_t line_proto_schedule_events (struct line_proto *lp, short what) |
72 { |
88 { |
73 // just use sock_stream's interface |
89 // just use sock_stream's interface |
84 err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size, |
100 err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size, |
85 line_proto_read_cb cb_func, void *cb_arg, struct error_info *err) |
101 line_proto_read_cb cb_func, void *cb_arg, struct error_info *err) |
86 { |
102 { |
87 struct line_proto *lp; |
103 struct line_proto *lp; |
88 |
104 |
89 // allocate |
105 // allocate struct and buffers |
90 if ((lp = calloc(1, sizeof(*lp))) == NULL) |
106 if ( |
91 return SET_ERROR(err, ERR_CALLOC); |
107 (lp = calloc(1, sizeof(*lp))) == NULL |
92 |
108 || (lp->in_buf = malloc(buf_size)) == NULL |
93 // allocate buffer |
109 || (lp->out_buf = malloc(buf_size)) == NULL |
94 if ((lp->buf = malloc(buf_size)) == NULL) { |
110 ) |
95 free(lp); |
111 JUMP_SET_ERROR(err, ERR_CALLOC); |
96 return SET_ERROR(err, ERR_CALLOC); |
|
97 } |
|
98 |
112 |
99 // store |
113 // store |
100 lp->sock = sock; |
114 lp->sock = sock; |
101 lp->buf_len = buf_size; |
115 lp->buf_len = buf_size; |
102 lp->cb_read = cb_func; |
116 lp->cb_read = cb_func; |
156 |
178 |
157 // return offset to next line, as set in loop based on delim |
179 // return offset to next line, as set in loop based on delim |
158 return next; |
180 return next; |
159 } |
181 } |
160 |
182 |
161 err_t line_proto_read (struct line_proto *lp, char **line_ptr) |
183 err_t line_proto_recv (struct line_proto *lp, char **line_ptr) |
162 { |
184 { |
163 // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line |
185 // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line |
164 size_t recv_offset = 0, peek_offset = 0, next_offset = 0; |
186 size_t recv_offset = 0, peek_offset = 0, next_offset = 0; |
165 int ret; |
187 int ret; |
166 |
188 |
168 recv_offset = lp->tail_len; |
190 recv_offset = lp->tail_len; |
169 |
191 |
170 // move trailing data from previous line to front of buffer |
192 // move trailing data from previous line to front of buffer |
171 if (lp->tail_offset) { |
193 if (lp->tail_offset) { |
172 // move to front |
194 // move to front |
173 memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len); |
195 memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len); |
174 |
196 |
175 // reset |
197 // reset |
176 lp->tail_offset = 0; |
198 lp->tail_offset = 0; |
177 lp->tail_len = 0; |
199 lp->tail_len = 0; |
178 } |
200 } |
179 |
201 |
180 // readline loop |
202 // readline loop |
181 do { |
203 do { |
182 // parse any line at the beginning of the buffer |
204 // parse any line at the beginning of the buffer |
183 if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) { |
205 if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) { |
184 // store a valid *line_ptr |
206 // store a valid *line_ptr |
185 *line_ptr = lp->buf; |
207 *line_ptr = lp->in_buf; |
186 |
208 |
187 // exit loop and return |
209 // exit loop and return |
188 break; |
210 break; |
189 } |
211 } |
190 |
212 |
191 // ensure there's enough space for the rest of the line |
213 // ensure there's enough space for the rest of the line |
192 // XXX: this must be an error, not an assert |
214 if (recv_offset >= lp->buf_len) |
193 assert(recv_offset < lp->buf_len); |
215 return ERR_LINE_TOO_LONG; |
194 |
216 |
195 // otherwise, read more data |
217 // otherwise, read more data |
196 if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) |
218 if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0) |
197 // store and return NULL on errors |
219 // store and return NULL on errors |
198 RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); |
220 RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); |
199 |
221 |
200 // EAGAIN? |
222 // EAGAIN? |
201 if (ret == 0) { |
223 if (ret == 0) { |
215 |
237 |
216 // ok |
238 // ok |
217 return SUCCESS; |
239 return SUCCESS; |
218 } |
240 } |
219 |
241 |
220 int line_proto_write (struct line_proto *lp, const char *line) |
242 int line_proto_send (struct line_proto *lp, const char *line) |
221 { |
243 { |
222 int ret; |
244 int ret; |
223 size_t len = strlen(line); |
245 size_t len = strlen(line); |
224 |
246 |
225 // XXX: no output buffers for now :) |
247 // drop line if we already have output buffered |
226 if ((ret = sock_stream_write(lp->sock, line, len)) < 0) |
248 if (lp->out_offset) |
227 RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); |
249 return -ERR_WRITE_EOF; |
|
250 |
|
251 // try and write the line |
|
252 if ((ret = sock_stream_write(lp->sock, line, len)) < 0) { |
|
253 SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); |
|
254 |
|
255 return -ERROR_CODE(&lp->err); |
|
256 } |
228 |
257 |
229 // EAGAIN or partial? |
258 // EAGAIN or partial? |
230 if (ret < len) { |
259 if (ret < len) { |
231 // XXX: ugly hack, need partial line buffering |
260 size_t trailing = len - ret; |
232 return -1; |
261 |
233 } |
262 // ensure it's not waaaay too long |
|
263 if (trailing > lp->buf_len) |
|
264 return -ERR_LINE_TOO_LONG; |
|
265 |
|
266 // copy remaining portion to buffer |
|
267 memcpy(lp->out_buf, line + ret, trailing); |
|
268 |
|
269 // update offset |
|
270 lp->out_offset = trailing; |
|
271 |
|
272 // register for EV_WRITE |
|
273 line_proto_schedule_events(lp, EV_READ | EV_WRITE); |
|
274 |
|
275 // buffered... |
|
276 return 1; |
|
277 |
|
278 } else { |
|
279 // ok, no buffering needed |
|
280 return SUCCESS; |
|
281 |
|
282 } |
|
283 } |
|
284 |
|
285 int line_proto_flush (struct line_proto *lp) |
|
286 { |
|
287 int ret; |
|
288 |
|
289 // try and write the line |
|
290 if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) { |
|
291 SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock)); |
|
292 |
|
293 return -ERROR_CODE(&lp->err); |
|
294 } |
|
295 |
|
296 // empty now? |
|
297 if (ret == lp->out_offset) { |
|
298 lp->out_offset = 0; |
|
299 |
|
300 return SUCCESS; |
|
301 } |
|
302 |
|
303 // partial? |
|
304 if (ret > 0) { |
|
305 size_t remaining = lp->out_offset - ret; |
|
306 |
|
307 // move the rest up |
|
308 memmove(lp->out_buf, lp->out_buf + ret, remaining); |
|
309 |
|
310 // update offset |
|
311 lp->out_offset = remaining; |
|
312 } |
|
313 |
|
314 // register for EV_WRITE |
|
315 line_proto_schedule_events(lp, EV_READ | EV_WRITE); |
234 |
316 |
235 // ok |
317 // ok |
236 return SUCCESS; |
318 return 1; |
237 } |
319 } |
238 |
320 |
239 const struct error_info* line_proto_error (struct line_proto *lp) |
321 const struct error_info* line_proto_error (struct line_proto *lp) |
240 { |
322 { |
241 // return pointer |
323 // return pointer |
242 return &lp->err; |
324 return &lp->err; |
243 } |
325 } |
|
326 |