improve line_proto output buffering slightly
authorTero Marttila <terom@fixme.fi>
Sat, 28 Feb 2009 23:48:34 +0200
changeset 19 8c80580ccde9
parent 18 dedf137b504f
child 20 d9c4c2980a0d
improve line_proto output buffering slightly
src/irc_conn.c
src/line_proto.c
src/line_proto.h
--- a/src/irc_conn.c	Sat Feb 28 22:47:39 2009 +0200
+++ b/src/irc_conn.c	Sat Feb 28 23:48:34 2009 +0200
@@ -63,7 +63,8 @@
     strcat(line_buf, "\r\n");
 
     // send using line_proto
-    return line_proto_write(conn->lp, line_buf);
+    // XXX: ignore output-buffering
+    return (err = line_proto_write(conn->lp, line_buf)) < 0 ? err : SUCCESS;
 }
 
 err_t irc_conn_NICK (struct irc_conn *conn, const char *nickname)
--- a/src/line_proto.c	Sat Feb 28 22:47:39 2009 +0200
+++ b/src/line_proto.c	Sat Feb 28 23:48:34 2009 +0200
@@ -14,10 +14,10 @@
     /* The sock_stream we read/write with */
     struct sock_stream *sock;
 
-    /* The incoming line buffer */
-    char *buf;
+    /* The incoming/outgoing line buffer */
+    char *in_buf, *out_buf;
 
-    /* Incoming buffer size */
+    /* Buffer size (same for both) */
     size_t buf_len;
 
     /* Offset of trailing data in buf */
@@ -26,6 +26,9 @@
     /* Length of trailing data in buf, if any */
     size_t tail_len;
 
+    /* Amount of data in the out buffer */
+    size_t out_offset;
+
     /* Last error */
     struct error_info err;
 
@@ -50,9 +53,9 @@
     
     do {
         // attempt to read a line
-        if (line_proto_read(lp, &line))
+        if (line_proto_recv(lp, &line))
             // XXX: fail
-            errx(1, "line_proto_read: %s", error_msg(&lp->err));
+            errx(1, "line_proto_recv: %s", error_msg(&lp->err));
         
         // got a line?
         if (line)
@@ -66,6 +69,19 @@
 }
 
 /*
+ * Signal for write
+ */
+static void line_proto_on_write (struct sock_stream *sock, void *arg)
+{
+    struct line_proto *lp = arg;
+    err_t err;
+
+    // just flush
+    if ((err = line_proto_flush(lp)) < 0)
+        errx(1, "line_proto_flush: %s", error_name(err));
+}
+
+/*
  * Schedule our sock_stream callback
  */
 static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
@@ -86,15 +102,13 @@
 {
     struct line_proto *lp;
 
-    // allocate
-    if ((lp = calloc(1, sizeof(*lp))) == NULL)
-        return SET_ERROR(err, ERR_CALLOC);
-    
-    // allocate buffer
-    if ((lp->buf = malloc(buf_size)) == NULL) {
-        free(lp);
-        return SET_ERROR(err, ERR_CALLOC);
-    }
+    // allocate struct and buffers
+    if (
+            (lp = calloc(1, sizeof(*lp))) == NULL
+        ||  (lp->in_buf = malloc(buf_size)) == NULL
+        ||  (lp->out_buf = malloc(buf_size)) == NULL
+    )
+        JUMP_SET_ERROR(err, ERR_CALLOC);
 
     // store
     lp->sock = sock;
@@ -113,6 +127,14 @@
     *lp_ptr = lp;
 
     return SUCCESS;
+
+error:
+    if (lp) {
+        free(lp->in_buf);
+        free(lp->out_buf);
+    }
+
+    free(lp);
 }
 
 /*
@@ -158,7 +180,7 @@
     return next;
 }
 
-err_t line_proto_read (struct line_proto *lp, char **line_ptr)
+err_t line_proto_recv (struct line_proto *lp, char **line_ptr)
 {
     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
@@ -170,7 +192,7 @@
     // move trailing data from previous line to front of buffer
     if (lp->tail_offset) {
         // move to front
-        memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
+        memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len);
 
         // reset
         lp->tail_offset = 0;
@@ -180,20 +202,20 @@
     // readline loop
     do {
         // parse any line at the beginning of the buffer
-        if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
+        if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) {
             // store a valid *line_ptr
-            *line_ptr = lp->buf;
+            *line_ptr = lp->in_buf;
             
             // exit loop and return
             break;
         }
 
         // ensure there's enough space for the rest of the line
-        // XXX: this must be an error, not an assert
-        assert(recv_offset < lp->buf_len);
+        if (recv_offset >= lp->buf_len)
+            return ERR_LINE_TOO_LONG;
         
         // otherwise, read more data
-        if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0)
+        if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0)
             // store and return NULL on errors
             RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
 
@@ -217,23 +239,83 @@
     return SUCCESS;
 }
 
-int line_proto_write (struct line_proto *lp, const char *line)
+int line_proto_send (struct line_proto *lp, const char *line)
 {
     int ret;
     size_t len = strlen(line);
 
-    // XXX: no output buffers for now :)
-    if ((ret = sock_stream_write(lp->sock, line, len)) < 0)
-        RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+    // drop line if we already have output buffered
+    if (lp->out_offset)
+        return -ERR_WRITE_EOF;
+    
+    // try and write the line
+    if ((ret = sock_stream_write(lp->sock, line, len)) < 0) {
+        SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+        return -ERROR_CODE(&lp->err);
+    }
 
     // EAGAIN or partial?
     if (ret < len) {
-        // XXX: ugly hack, need partial line buffering
-        return -1;
+        size_t trailing = len - ret;
+
+        // ensure it's not waaaay too long
+        if (trailing > lp->buf_len)
+            return -ERR_LINE_TOO_LONG;
+
+        // copy remaining portion to buffer
+        memcpy(lp->out_buf, line + ret, trailing);
+
+        // update offset
+        lp->out_offset = trailing;
+        
+        // register for EV_WRITE
+        line_proto_schedule_events(lp, EV_READ | EV_WRITE);
+
+        // buffered...
+        return 1;
+
+    } else {
+        // ok, no buffering needed
+        return SUCCESS;
+
+    }
+}
+
+int line_proto_flush (struct line_proto *lp)
+{
+    int ret;
+
+    // try and write the line
+    if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) {
+        SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
+
+        return -ERROR_CODE(&lp->err);
     }
 
+    // empty now?
+    if (ret == lp->out_offset) {
+        lp->out_offset = 0;
+
+        return SUCCESS;
+    }
+
+    // partial?
+    if (ret > 0) {
+        size_t remaining = lp->out_offset - ret;
+
+        // move the rest up
+        memmove(lp->out_buf, lp->out_buf + ret, remaining);
+
+        // update offset
+        lp->out_offset = remaining;
+    }
+
+    // register for EV_WRITE
+    line_proto_schedule_events(lp, EV_READ | EV_WRITE);
+
     // ok
-    return SUCCESS;
+    return 1;
 }
 
 const struct error_info* line_proto_error (struct line_proto *lp)
@@ -241,3 +323,4 @@
     // return pointer
     return &lp->err;
 }
+
--- a/src/line_proto.h	Sat Feb 28 22:47:39 2009 +0200
+++ b/src/line_proto.h	Sat Feb 28 23:48:34 2009 +0200
@@ -32,7 +32,7 @@
  * returned via *line_ptr, and we return SUCCESS. If we don't yet have a full line, and receiving more would block,
  * NULL is returned via *line_ptr instead. Otherwise, nonzero error return code.
  */
-err_t line_proto_read (struct line_proto *lp, char **line_ptr);
+err_t line_proto_recv (struct line_proto *lp, char **line_ptr);
 
 /*
  * Write a single line to the sock_stream, buffering any incomplete fragment that remains unset. Returns zero if the
@@ -40,7 +40,13 @@
  *
  * The given line should already include the terminating '\r\n' character sequence.
  */
-int line_proto_write (struct line_proto *lp, const char *line);
+int line_proto_send (struct line_proto *lp, const char *line);
+
+/*
+ * Flush out any buffered line fragment. Returns zero if the buffer was flushed empty, >0 if there's still fragments
+ * remaining, or -err on errors.
+ */
+int line_proto_flush (struct line_proto *lp);
 
 /*
  * Get current error_info*