src/line_proto.c
changeset 10 9fe218576d13
parent 8 be88e543c8ff
child 11 14e79683c48c
--- a/src/line_proto.c	Sun Feb 22 08:48:21 2009 +0200
+++ b/src/line_proto.c	Sun Feb 22 10:16:28 2009 +0200
@@ -5,6 +5,8 @@
 #include <stdlib.h>
 #include <assert.h>
 
+#include <err.h>
+
 /*
  * Our state
  */
@@ -12,6 +14,12 @@
     /* The sock_stream we read/write with */
     struct sock_stream *sock;
 
+    /* The incoming line buffer */
+    char *buf;
+
+    /* Incoming buffer size */
+    size_t buf_len;
+
     /* Offset of trailing data in buf */
     size_t tail_offset;
 
@@ -20,18 +28,82 @@
 
     /* Last error */
     struct error_info err;
+
+    /* Callback info */
+    line_proto_read_cb cb_read;
+    void *cb_arg;
 };
 
-err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err)
+// function prototypes
+static err_t line_proto_schedule_events (struct line_proto *lp, short what);
+
+/*
+ * Our sock_stream on_read handler
+ */
+static void line_proto_on_read (struct sock_stream *sock, void *arg)
+{
+    struct line_proto *lp = arg;
+    const char *line;
+
+    // sanity-check
+    assert(lp->tail_offset < lp->buf_len);
+    
+    do {
+        // attempt to read a line
+        if (line_proto_read(lp, &line))
+            // XXX: fail
+            errx(1, "line_proto_read: %s", error_msg(&lp->err));
+        
+        // got a line?
+        if (line)
+            lp->cb_read(lp, line, lp->cb_arg);
+
+    } while (line);
+
+    // reschedule
+    if (line_proto_schedule_events(lp, EV_READ))
+        errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
+}
+
+/*
+ * Schedule our sock_stream callback
+ */
+static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
+{
+    // just use sock_stream's interface
+    if (sock_stream_event_enable(lp->sock, what))
+        RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+    return SUCCESS;
+}
+
+struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
+    .on_read    = &line_proto_on_read,
+};
+
+err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
+        line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
 {
     struct line_proto *lp;
 
     // allocate
     if ((lp = calloc(1, sizeof(*lp))) == NULL)
         return SET_ERROR(err, ERR_CALLOC);
+    
+    // allocate buffer
+    if ((lp->buf = malloc(buf_size)) == NULL) {
+        free(lp);
+        return SET_ERROR(err, ERR_CALLOC);
+    }
 
     // store
     lp->sock = sock;
+    lp->buf_len = buf_size;
+    lp->cb_read = cb_func;
+    lp->cb_arg = cb_arg;
+
+    // initialize event-based stuff
+    sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp);
 
     // return
     *lp_ptr = lp;
@@ -75,7 +147,7 @@
     return i + 2;
 }
 
-err_t line_proto_read (struct line_proto *lp, char *buf, size_t len)
+err_t line_proto_read (struct line_proto *lp, const char **line_ptr)
 {
     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
@@ -87,7 +159,7 @@
     // move trailing data from previous line to front of buffer
     if (lp->tail_offset) {
         // move to front
-        memmove(buf, buf + lp->tail_offset, lp->tail_len);
+        memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
 
         // reset
         lp->tail_offset = 0;
@@ -97,15 +169,32 @@
     // readline loop
     do {
         // parse any line at the beginning of the buffer
-        if ((next_offset = _parse_line(buf, recv_offset, &peek_offset)) > 0)
+        if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
+            // store a valid *line_ptr
+            *line_ptr = lp->buf;
+            
+            // exit loop and return
             break;
+        }
 
-        // ensure there's enough space for it
-        assert(recv_offset < len);
+        // ensure there's enough space for the rest of the line
+        // XXX: this must be an error, not an assert
+        assert(recv_offset < lp->buf_len);
         
         // otherwise, read more data
-        if ((ret = sock_stream_read(lp->sock, buf + recv_offset, len - recv_offset)) < 0)
-            RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+        if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) {
+            // we can special-case EAGAIN, as it's expected
+            if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) {
+                // return a NULL *line_ptr
+                *line_ptr = NULL;
+                break;
+
+            } else {
+                // store and return NULL on errors
+                RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+            }
+        }
 
         // EOF?
         if (ret == 0)