--- a/src/line_proto.c Sun Feb 22 08:48:21 2009 +0200
+++ b/src/line_proto.c Sun Feb 22 10:16:28 2009 +0200
@@ -5,6 +5,8 @@
#include <stdlib.h>
#include <assert.h>
+#include <err.h>
+
/*
* Our state
*/
@@ -12,6 +14,12 @@
/* The sock_stream we read/write with */
struct sock_stream *sock;
+ /* The incoming line buffer */
+ char *buf;
+
+ /* Incoming buffer size */
+ size_t buf_len;
+
/* Offset of trailing data in buf */
size_t tail_offset;
@@ -20,18 +28,82 @@
/* Last error */
struct error_info err;
+
+ /* Callback info */
+ line_proto_read_cb cb_read;
+ void *cb_arg;
};
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err)
+// function prototypes
+static err_t line_proto_schedule_events (struct line_proto *lp, short what);
+
+/*
+ * Our sock_stream on_read handler
+ */
+static void line_proto_on_read (struct sock_stream *sock, void *arg)
+{
+ struct line_proto *lp = arg;
+ const char *line;
+
+ // sanity-check
+ assert(lp->tail_offset < lp->buf_len);
+
+ do {
+ // attempt to read a line
+ if (line_proto_read(lp, &line))
+ // XXX: fail
+ errx(1, "line_proto_read: %s", error_msg(&lp->err));
+
+ // got a line?
+ if (line)
+ lp->cb_read(lp, line, lp->cb_arg);
+
+ } while (line);
+
+ // reschedule
+ if (line_proto_schedule_events(lp, EV_READ))
+ errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
+}
+
+/*
+ * Schedule our sock_stream callback
+ */
+static err_t line_proto_schedule_events (struct line_proto *lp, short what)
+{
+ // just use sock_stream's interface
+ if (sock_stream_event_enable(lp->sock, what))
+ RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+ return SUCCESS;
+}
+
+struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
+ .on_read = &line_proto_on_read,
+};
+
+err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+ line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
{
struct line_proto *lp;
// allocate
if ((lp = calloc(1, sizeof(*lp))) == NULL)
return SET_ERROR(err, ERR_CALLOC);
+
+ // allocate buffer
+ if ((lp->buf = malloc(buf_size)) == NULL) {
+ free(lp);
+ return SET_ERROR(err, ERR_CALLOC);
+ }
// store
lp->sock = sock;
+ lp->buf_len = buf_size;
+ lp->cb_read = cb_func;
+ lp->cb_arg = cb_arg;
+
+ // initialize event-based stuff
+ sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp);
// return
*lp_ptr = lp;
@@ -75,7 +147,7 @@
return i + 2;
}
-err_t line_proto_read (struct line_proto *lp, char *buf, size_t len)
+err_t line_proto_read (struct line_proto *lp, const char **line_ptr)
{
// offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
@@ -87,7 +159,7 @@
// move trailing data from previous line to front of buffer
if (lp->tail_offset) {
// move to front
- memmove(buf, buf + lp->tail_offset, lp->tail_len);
+ memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
// reset
lp->tail_offset = 0;
@@ -97,15 +169,32 @@
// readline loop
do {
// parse any line at the beginning of the buffer
- if ((next_offset = _parse_line(buf, recv_offset, &peek_offset)) > 0)
+ if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
+ // store a valid *line_ptr
+ *line_ptr = lp->buf;
+
+ // exit loop and return
break;
+ }
- // ensure there's enough space for it
- assert(recv_offset < len);
+ // ensure there's enough space for the rest of the line
+ // XXX: this must be an error, not an assert
+ assert(recv_offset < lp->buf_len);
// otherwise, read more data
- if ((ret = sock_stream_read(lp->sock, buf + recv_offset, len - recv_offset)) < 0)
- RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+ if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) {
+ // we can special-case EAGAIN, as it's expected
+ if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) {
+ // return a NULL *line_ptr
+ *line_ptr = NULL;
+ break;
+
+ } else {
+ // store and return NULL on errors
+ RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+ }
+ }
// EOF?
if (ret == 0)