#include "line_proto.h"
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <err.h>
/*
* Our state
*/
struct line_proto {
/* The sock_stream we read/write with */
struct sock_stream *sock;
/* The incoming/outgoing line buffer */
char *in_buf, *out_buf;
/* Buffer size (same for both) */
size_t buf_len;
/* Offset of trailing data in buf */
size_t tail_offset;
/* Length of trailing data in buf, if any */
size_t tail_len;
/* Amount of data in the out buffer */
size_t out_offset;
/* Last error */
struct error_info err;
/* Callback info */
line_proto_read_cb cb_read;
void *cb_arg;
};
// function prototypes
static err_t line_proto_schedule_events (struct line_proto *lp, short what);
/*
* Our sock_stream on_read handler
*/
static void line_proto_on_read (struct sock_stream *sock, void *arg)
{
struct line_proto *lp = arg;
char *line;
// sanity-check
assert(lp->tail_offset < lp->buf_len);
do {
// attempt to read a line
if (line_proto_recv(lp, &line))
// XXX: fail
errx(1, "line_proto_recv: %s", error_msg(&lp->err));
// got a line?
if (line)
lp->cb_read(line, lp->cb_arg);
} while (line);
// reschedule
if (line_proto_schedule_events(lp, EV_READ))
errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
}
/*
* Signal for write
*/
static void line_proto_on_write (struct sock_stream *sock, void *arg)
{
struct line_proto *lp = arg;
err_t err;
// just flush
if ((err = line_proto_flush(lp)) < 0)
errx(1, "line_proto_flush: %s", error_name(err));
}
/*
* Schedule our sock_stream callback
*/
static err_t line_proto_schedule_events (struct line_proto *lp, short what)
{
// just use sock_stream's interface
if (sock_stream_event_enable(lp->sock, what))
RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
return SUCCESS;
}
struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
.on_read = &line_proto_on_read,
.on_write = &line_proto_on_write,
};
err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
{
struct line_proto *lp;
// allocate struct and buffers
if (
(lp = calloc(1, sizeof(*lp))) == NULL
|| (lp->in_buf = malloc(buf_size)) == NULL
|| (lp->out_buf = malloc(buf_size)) == NULL
)
JUMP_SET_ERROR(err, ERR_CALLOC);
// store
lp->sock = sock;
lp->buf_len = buf_size;
lp->cb_read = cb_func;
lp->cb_arg = cb_arg;
// initialize event-based stuff
if (
sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp)
|| line_proto_schedule_events(lp, EV_READ)
)
JUMP_SET_ERROR_INFO(err, &lp->err);
// return
*lp_ptr = lp;
return SUCCESS;
error:
if (lp) {
free(lp->in_buf);
free(lp->out_buf);
// XXX: handle sock init errors
}
free(lp);
return ERROR_CODE(err);
}
/*
* This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be
* replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned
* (which is never a valid next-line offset).
*
* The given \a hint is an hint as to the offset at which to start scanning, used for incremental invocations of this
* on the same buffer.
*
*/
int _parse_line (char *buf, size_t len, size_t *hint) {
int i, next = 0;
// empty buffer -> nothing
if (len == 0)
return 0;
// look for terminating '\r\n' or '\n' sequence
for (i = *hint; i < len; i++) {
// match this + next char?
if (i < len - 1 && buf[i] == '\r' && buf[i + 1] == '\n') {
next = i + 2;
break;
} else if (buf[i] == '\n') {
next = i + 1;
break;
}
}
// searched the whole buffer?
if (i >= len) {
// do continue one char back, to keep any \r
*hint = len - 1;
return 0;
}
// mangle the newline off
buf[i] = '\0';
// return offset to next line, as set in loop based on delim
return next;
}
err_t line_proto_recv (struct line_proto *lp, char **line_ptr)
{
// offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
int ret;
// adjust offset to beyond previous data (as will be moved next)
recv_offset = lp->tail_len;
// move trailing data from previous line to front of buffer
if (lp->tail_offset) {
// move to front
memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len);
// reset
lp->tail_offset = 0;
lp->tail_len = 0;
}
// readline loop
do {
// parse any line at the beginning of the buffer
if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) {
// store a valid *line_ptr
*line_ptr = lp->in_buf;
// exit loop and return
break;
}
// ensure there's enough space for the rest of the line
if (recv_offset >= lp->buf_len)
return ERR_LINE_TOO_LONG;
// otherwise, read more data
if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0)
// store and return NULL on errors
RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
// EAGAIN?
if (ret == 0) {
// return a NULL *line_ptr
*line_ptr = NULL;
break;
}
// update recv_offset
recv_offset += ret;
} while (1);
// update state for next call
lp->tail_offset = next_offset;
lp->tail_len = recv_offset - next_offset;
// ok
return SUCCESS;
}
int line_proto_send (struct line_proto *lp, const char *line)
{
int ret;
size_t len = strlen(line);
// drop line if we already have output buffered
if (lp->out_offset)
return -ERR_WRITE_EOF;
// try and write the line
if ((ret = sock_stream_write(lp->sock, line, len)) < 0) {
SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
return -ERROR_CODE(&lp->err);
}
// EAGAIN or partial?
if (ret < len) {
size_t trailing = len - ret;
// ensure it's not waaaay too long
if (trailing > lp->buf_len)
return -ERR_LINE_TOO_LONG;
// copy remaining portion to buffer
memcpy(lp->out_buf, line + ret, trailing);
// update offset
lp->out_offset = trailing;
// register for EV_WRITE
line_proto_schedule_events(lp, EV_READ | EV_WRITE);
// buffered...
return 1;
} else {
// ok, no buffering needed
return SUCCESS;
}
}
int line_proto_flush (struct line_proto *lp)
{
int ret;
// try and write the line
if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) {
SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
return -ERROR_CODE(&lp->err);
}
// empty now?
if (ret == lp->out_offset) {
lp->out_offset = 0;
return SUCCESS;
}
// partial?
if (ret > 0) {
size_t remaining = lp->out_offset - ret;
// move the rest up
memmove(lp->out_buf, lp->out_buf + ret, remaining);
// update offset
lp->out_offset = remaining;
}
// register for EV_WRITE
line_proto_schedule_events(lp, EV_READ | EV_WRITE);
// ok
return 1;
}
const struct error_info* line_proto_error (struct line_proto *lp)
{
// return pointer
return &lp->err;
}