--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/line_proto.c Thu May 28 01:17:36 2009 +0300
@@ -0,0 +1,336 @@
+
+#include "line_proto.h"
+#include "log.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <assert.h>
+
+/*
+ * Our state
+ */
+struct line_proto {
+ /* The transport we read/write with */
+ transport_t *transport;
+
+ /* The incoming/outgoing line buffer */
+ char *in_buf, *out_buf;
+
+ /* Buffer size (same for both) */
+ size_t buf_len;
+
+ /* Offset of trailing data in buf */
+ size_t tail_offset;
+
+ /* Length of trailing data in buf, if any */
+ size_t tail_len;
+
+ /* Amount of data in the out buffer */
+ size_t out_offset;
+
+ /* Last error */
+ struct error_info err;
+
+ /* Callback info */
+ struct line_proto_callbacks callbacks;
+ void *cb_arg;
+};
+
+/**
+ * An error occured which we could not recover from; the line_proto should now be considered corrupt.
+ *
+ * Notify the user callback, which will probably call line_proto_release().
+ */
+static void line_proto_set_error (struct line_proto *lp)
+{
+ // copy error_info, as it might get free'd
+ struct error_info err = lp->err;
+
+ // trigger callback
+ lp->callbacks.on_error(&err, lp->cb_arg);
+}
+
+/**
+ * Our transport_callbacks::on_read handler
+ */
+static void line_proto_on_read (transport_t *transport, void *arg)
+{
+ struct line_proto *lp = arg;
+ char *line;
+
+ (void) transport;
+
+ // sanity-check
+ assert(lp->tail_offset < lp->buf_len);
+
+ do {
+ // attempt to read a line
+ if (line_proto_recv(lp, &line))
+ // faaail
+ return line_proto_set_error(lp);
+
+ // got a line?
+ if (line)
+ lp->callbacks.on_line(line, lp->cb_arg);
+
+ } while (line);
+}
+
+/*
+ * Signal for write
+ */
+static void line_proto_on_write (transport_t *transport, void *arg)
+{
+ struct line_proto *lp = arg;
+ int ret;
+
+ (void) transport;
+
+ // just flush
+ if ((ret = line_proto_flush(lp)) < 0)
+ // faaail
+ return line_proto_set_error(lp);
+}
+
+// XXX: implement on_error!
+static const struct transport_callbacks line_proto_transport_callbacks = {
+ .on_read = &line_proto_on_read,
+ .on_write = &line_proto_on_write,
+};
+
+err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
+ const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err)
+{
+ struct line_proto *lp;
+
+ // alloc
+ if ((lp = calloc(1, sizeof(*lp))) == NULL)
+ return SET_ERROR(err, ERR_CALLOC);
+
+ // store
+ lp->transport = transport;
+ lp->buf_len = buf_size;
+ lp->callbacks = *callbacks;
+ lp->cb_arg = cb_arg;
+
+ // allocate buffers
+ if (
+ (lp->in_buf = malloc(buf_size)) == NULL
+ || (lp->out_buf = malloc(buf_size)) == NULL
+ )
+ JUMP_SET_ERROR(err, ERR_CALLOC);
+
+ // setup the transport
+ transport_set_callbacks(transport, &line_proto_transport_callbacks, lp);
+
+ if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE)))
+ goto error;
+
+ // return
+ *lp_ptr = lp;
+
+ return SUCCESS;
+
+error:
+ // cleanup the lp
+ line_proto_destroy(lp);
+
+ return ERROR_CODE(err);
+}
+
+/*
+ * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be
+ * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned
+ * (which is never a valid next-line offset).
+ *
+ * The given \a hint is an hint as to the offset at which to start scanning, used for incremental invocations of this
+ * on the same buffer.
+ *
+ */
+int _parse_line (char *buf, size_t len, size_t *hint) {
+ size_t i, next = 0;
+
+ // empty buffer -> nothing
+ if (len == 0)
+ return 0;
+
+ // look for terminating '\r\n' or '\n' sequence
+ for (i = *hint; i < len; i++) {
+ // match this + next char?
+ if (i < len - 1 && buf[i] == '\r' && buf[i + 1] == '\n') {
+ next = i + 2;
+ break;
+
+ } else if (buf[i] == '\n') {
+ next = i + 1;
+ break;
+ }
+ }
+
+ // searched the whole buffer?
+ if (i >= len) {
+ // do continue one char back, to keep any \r
+ *hint = len - 1;
+ return 0;
+ }
+
+ // mangle the newline off
+ buf[i] = '\0';
+
+ // return offset to next line, as set in loop based on delim
+ return next;
+}
+
+err_t line_proto_recv (struct line_proto *lp, char **line_ptr)
+{
+ // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
+ size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
+ int ret;
+
+ // adjust offset to beyond previous data (as will be moved next)
+ recv_offset = lp->tail_len;
+
+ // move trailing data from previous line to front of buffer
+ if (lp->tail_offset) {
+ // move to front, no-op if tail_len is zero
+ memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len);
+
+ // reset
+ lp->tail_offset = 0;
+ lp->tail_len = 0;
+ }
+
+ // readline loop
+ do {
+ // parse any line at the beginning of the buffer
+ if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) {
+ // store a valid *line_ptr
+ *line_ptr = lp->in_buf;
+
+ // exit loop and return
+ break;
+ }
+
+ // ensure there's enough space for the rest of the line
+ if (recv_offset >= lp->buf_len)
+ return ERR_LINE_TOO_LONG;
+
+ // otherwise, read more data
+ if ((ret = transport_read(lp->transport, lp->in_buf + recv_offset, lp->buf_len - recv_offset, &lp->err)) < 0)
+ return ERROR_CODE(&lp->err);
+
+ // EAGAIN?
+ if (ret == 0) {
+ // return a NULL *line_ptr
+ *line_ptr = NULL;
+ break;
+ }
+
+ // update recv_offset
+ recv_offset += ret;
+
+ } while (1);
+
+ // update state for next call
+ lp->tail_offset = next_offset;
+ lp->tail_len = recv_offset - next_offset;
+
+ // ok
+ return SUCCESS;
+}
+
+int line_proto_send (struct line_proto *lp, const char *line)
+{
+ int ret;
+ size_t len = strlen(line), ret_len;
+
+ // drop line if we already have output buffered
+ if (lp->out_offset)
+ return -ERR_LINE_TOO_LONG;
+
+ // try and write the line
+ if ((ret = transport_write(lp->transport, line, len, &lp->err)) < 0)
+ return -ERROR_CODE(&lp->err);
+
+ // length of the sent data
+ ret_len = ret;
+
+ // EAGAIN or partial?
+ if (ret_len < len) {
+ size_t trailing = len - ret_len;
+
+ // ensure it's not waaaay too long
+ if (trailing > lp->buf_len)
+ return -ERR_LINE_TOO_LONG;
+
+ // copy remaining portion to buffer
+ memcpy(lp->out_buf, line + ret_len, trailing);
+
+ // update offset
+ lp->out_offset = trailing;
+
+ // buffered... transport should invoke on_write itself
+ return 1;
+
+ } else {
+ // ok, no buffering needed
+ return SUCCESS;
+
+ }
+}
+
+int line_proto_flush (struct line_proto *lp)
+{
+ int ret;
+ size_t ret_len;
+
+ assert(lp->out_offset);
+
+ // try and write the line
+ if ((ret = transport_write(lp->transport, lp->out_buf, lp->out_offset, &lp->err)) < 0)
+ return -ERROR_CODE(&lp->err);
+
+ ret_len = ret;
+
+ // empty now?
+ if (ret_len == lp->out_offset) {
+ lp->out_offset = 0;
+
+ return SUCCESS;
+ }
+
+ // partial?
+ if (ret_len > 0) {
+ size_t remaining = lp->out_offset - ret_len;
+
+ // move the rest up
+ memmove(lp->out_buf, lp->out_buf + ret_len, remaining);
+
+ // update offset
+ lp->out_offset = remaining;
+ }
+
+ // ok
+ return 1;
+}
+
+const struct error_info* line_proto_error (struct line_proto *lp)
+{
+ // return pointer
+ return &lp->err;
+}
+
+void line_proto_destroy (struct line_proto *lp)
+{
+ // free buffers
+ free(lp->in_buf);
+ free(lp->out_buf);
+
+ // socket?
+ if (lp->transport)
+ transport_destroy(lp->transport);
+
+ // free the state itself
+ free(lp);
+}
+