src/line_proto.c
author Tero Marttila <terom@fixme.fi>
Thu, 12 Mar 2009 22:50:08 +0200
changeset 45 71e65564afd2
parent 41 40f7aa051acb
child 47 7d4094eb3117
permissions -rw-r--r--
remove irc_chan.state, modify irc_chan_callbacks.on_msg to take a irc_nm, improve error handling a bit further (up to irc_net now)

#include "line_proto.h"
#include "log.h"

#include <string.h>
#include <stdlib.h>
#include <assert.h>

/*
 * Our state
 */
struct line_proto {
    /* The sock_stream we read/write with */
    struct sock_stream *sock;

    /* The incoming/outgoing line buffer */
    char *in_buf, *out_buf;

    /* Buffer size (same for both) */
    size_t buf_len;

    /* Offset of trailing data in buf */
    size_t tail_offset;

    /* Length of trailing data in buf, if any */
    size_t tail_len;

    /* Amount of data in the out buffer */
    size_t out_offset;

    /* Last error */
    struct error_info err;

    /* Callback info */
    struct line_proto_callbacks callbacks;
    void *cb_arg;
};

// function prototypes
static err_t line_proto_schedule_events (struct line_proto *lp, short what);

/**
 * An error occured which we could not recover from; the line_proto should now be considered corrupt.
 *
 * Notify the user callback, which will probably call line_proto_release().
 */
static void line_proto_set_error (struct line_proto *lp)
{
    // trigger callback
    lp->callbacks.on_error(&lp->err, lp->cb_arg);
}

/**
 * Our sock_stream on_read handler
 */
static void line_proto_on_read (struct sock_stream *sock, void *arg)
{
    struct line_proto *lp = arg;
    char *line;

    (void) sock;

    // sanity-check
    assert(lp->tail_offset < lp->buf_len);
    
    do {
        // attempt to read a line
        if (line_proto_recv(lp, &line)) {
            // faaail
            return line_proto_set_error(lp);
        }

        // got a line?
        if (line)
            lp->callbacks.on_line(line, lp->cb_arg);

    } while (line);

    // reschedule
    if (line_proto_schedule_events(lp, EV_READ))
        line_proto_set_error(lp);
}

/*
 * Signal for write
 */
static void line_proto_on_write (struct sock_stream *sock, void *arg)
{
    struct line_proto *lp = arg;
    int ret;

    (void) sock;

    // just flush
    if ((ret = line_proto_flush(lp)) < 0) {
        // faaaail
        SET_ERROR(&lp->err, -ret);

        return line_proto_set_error(lp);
    }
}

/*
 * Schedule our sock_stream callback
 */
static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
{
    // just use sock_stream's interface
    if (sock_stream_event_enable(lp->sock, what))
        RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));

    return SUCCESS;
}

struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
    .on_read    = &line_proto_on_read,
    .on_write   = &line_proto_on_write,
};

err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
        const struct line_proto_callbacks *callbacks, void *cb_arg, struct error_info *err)
{
    struct line_proto *lp;

    // allocate struct and buffers
    if (
            (lp = calloc(1, sizeof(*lp))) == NULL
        ||  (lp->in_buf = malloc(buf_size)) == NULL
        ||  (lp->out_buf = malloc(buf_size)) == NULL
    )
        JUMP_SET_ERROR(err, ERR_CALLOC);

    // store
    lp->sock = sock;
    lp->buf_len = buf_size;
    lp->callbacks = *callbacks;
    lp->cb_arg = cb_arg;

    // initialize event-based stuff
    if (
            sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp)
        ||  line_proto_schedule_events(lp, EV_READ)
    )
        JUMP_SET_ERROR_INFO(err, &lp->err);

    // return
    *lp_ptr = lp;

    return SUCCESS;

error:
    // cleanup the lp
    if (lp)
        line_proto_release(lp);

    return ERROR_CODE(err);
}

/*
 * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be
 * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned
 * (which is never a valid next-line offset).
 *
 * The given \a hint is an hint as to the offset at which to start scanning, used for incremental invocations of this
 * on the same buffer.
 *
 */
int _parse_line (char *buf, size_t len, size_t *hint) {
    size_t i, next = 0;

    // empty buffer -> nothing
    if (len == 0)
        return 0;

    // look for terminating '\r\n' or '\n' sequence
    for (i = *hint; i < len; i++) {
        // match this + next char?
        if (i < len - 1 && buf[i] == '\r' && buf[i + 1] == '\n') {
            next = i + 2;
            break;

        } else if (buf[i] == '\n') {
            next = i + 1;
            break;
        }
    }

    // searched the whole buffer?
    if (i >= len) {
        // do continue one char back, to keep any \r
        *hint = len - 1;
        return 0;
    }

    // mangle the newline off
    buf[i] = '\0';

    // return offset to next line, as set in loop based on delim
    return next;
}

err_t line_proto_recv (struct line_proto *lp, char **line_ptr)
{
    // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
    size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
    int ret;

    // adjust offset to beyond previous data (as will be moved next)
    recv_offset = lp->tail_len;

    // move trailing data from previous line to front of buffer
    if (lp->tail_offset) {
        // move to front, no-op if tail_len is zero
        memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len);

        // reset
        lp->tail_offset = 0;
        lp->tail_len = 0;
    }

    // readline loop
    do {
        // parse any line at the beginning of the buffer
        if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) {
            // store a valid *line_ptr
            *line_ptr = lp->in_buf;
            
            // exit loop and return
            break;
        }

        // ensure there's enough space for the rest of the line
        if (recv_offset >= lp->buf_len)
            return ERR_LINE_TOO_LONG;
        
        // otherwise, read more data
        if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0)
            // store and return NULL on errors
            RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));

        // EAGAIN?
        if (ret == 0) {
            // return a NULL *line_ptr
            *line_ptr = NULL;
            break;
        }
        
        // update recv_offset
        recv_offset += ret;

    } while (1);

    // update state for next call
    lp->tail_offset = next_offset;
    lp->tail_len = recv_offset - next_offset;

    // ok
    return SUCCESS;
}

int line_proto_send (struct line_proto *lp, const char *line)
{
    int ret;
    size_t len = strlen(line), ret_len;

    // drop line if we already have output buffered
    if (lp->out_offset)
        return -ERR_LINE_TOO_LONG;
    
    // try and write the line
    if ((ret = sock_stream_write(lp->sock, line, len)) < 0) {
        SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));

        return -ERROR_CODE(&lp->err);
    }

    // length of the sent data
    ret_len = ret;

    // EAGAIN or partial?
    if (ret_len < len) {
        size_t trailing = len - ret_len;

        // ensure it's not waaaay too long
        if (trailing > lp->buf_len)
            return -ERR_LINE_TOO_LONG;

        // copy remaining portion to buffer
        memcpy(lp->out_buf, line + ret_len, trailing);

        // update offset
        lp->out_offset = trailing;
        
        // register for EV_WRITE
        if (line_proto_schedule_events(lp, EV_READ | EV_WRITE))
            return -ERROR_CODE(&lp->err);

        // buffered...
        return 1;

    } else {
        // ok, no buffering needed
        return SUCCESS;

    }
}

int line_proto_flush (struct line_proto *lp)
{
    int ret;
    size_t ret_len;

    // try and write the line
    if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) {
        SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));

        return -ERROR_CODE(&lp->err);
    }

    ret_len = ret;

    // empty now?
    if (ret_len == lp->out_offset) {
        lp->out_offset = 0;

        return SUCCESS;
    }

    // partial?
    if (ret_len > 0) {
        size_t remaining = lp->out_offset - ret_len;

        // move the rest up
        memmove(lp->out_buf, lp->out_buf + ret_len, remaining);

        // update offset
        lp->out_offset = remaining;
    }

    // register for EV_WRITE
    if (line_proto_schedule_events(lp, EV_READ | EV_WRITE))
        return -ERROR_CODE(&lp->err);

    // ok
    return 1;
}

const struct error_info* line_proto_error (struct line_proto *lp)
{
    // return pointer
    return &lp->err;
}

void line_proto_release (struct line_proto *lp)
{
    // free buffers
    free(lp->in_buf);
    free(lp->out_buf);

    // socket?
    if (lp->sock)
        sock_stream_release(lp->sock);

    // free the state itself
    free(lp);
}