terom@8: terom@8: #include "line_proto.h" terom@22: #include "log.h" terom@8: terom@8: #include terom@8: #include terom@8: #include terom@8: terom@8: /* terom@8: * Our state terom@8: */ terom@8: struct line_proto { terom@156: /* The transport we read/write with */ terom@156: transport_t *transport; terom@8: terom@19: /* The incoming/outgoing line buffer */ terom@19: char *in_buf, *out_buf; terom@10: terom@19: /* Buffer size (same for both) */ terom@10: size_t buf_len; terom@10: terom@8: /* Offset of trailing data in buf */ terom@8: size_t tail_offset; terom@8: terom@8: /* Length of trailing data in buf, if any */ terom@8: size_t tail_len; terom@8: terom@19: /* Amount of data in the out buffer */ terom@19: size_t out_offset; terom@19: terom@8: /* Last error */ terom@8: struct error_info err; terom@10: terom@10: /* Callback info */ terom@32: struct line_proto_callbacks callbacks; terom@10: void *cb_arg; terom@8: }; terom@8: terom@33: /** terom@45: * An error occured which we could not recover from; the line_proto should now be considered corrupt. terom@33: * terom@45: * Notify the user callback, which will probably call line_proto_release(). terom@33: */ terom@45: static void line_proto_set_error (struct line_proto *lp) terom@33: { terom@47: // copy error_info, as it might get free'd terom@47: struct error_info err = lp->err; terom@47: terom@33: // trigger callback terom@47: lp->callbacks.on_error(&err, lp->cb_arg); terom@33: } terom@33: terom@33: /** terom@156: * Our transport_callbacks::on_read handler terom@10: */ terom@156: static void line_proto_on_read (transport_t *transport, void *arg) terom@10: { terom@10: struct line_proto *lp = arg; terom@17: char *line; terom@10: terom@156: (void) transport; terom@27: terom@10: // sanity-check terom@10: assert(lp->tail_offset < lp->buf_len); terom@10: terom@10: do { terom@10: // attempt to read a line terom@156: if (line_proto_recv(lp, &line)) terom@33: // faaail terom@45: return line_proto_set_error(lp); terom@33: terom@10: // got a line? terom@10: if (line) terom@32: lp->callbacks.on_line(line, lp->cb_arg); terom@10: terom@10: } while (line); terom@10: } terom@10: terom@10: /* terom@19: * Signal for write terom@19: */ terom@156: static void line_proto_on_write (transport_t *transport, void *arg) terom@19: { terom@19: struct line_proto *lp = arg; terom@27: int ret; terom@27: terom@156: (void) transport; terom@19: terom@19: // just flush terom@156: if ((ret = line_proto_flush(lp)) < 0) terom@156: // faaail terom@45: return line_proto_set_error(lp); terom@19: } terom@19: terom@156: static const struct transport_callbacks line_proto_transport_callbacks = { terom@10: .on_read = &line_proto_on_read, terom@20: .on_write = &line_proto_on_write, terom@10: }; terom@10: terom@156: err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size, terom@156: const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err) terom@8: { terom@8: struct line_proto *lp; terom@8: terom@160: // alloc terom@160: if ((lp = calloc(1, sizeof(*lp))) == NULL) terom@160: return SET_ERROR(err, ERR_CALLOC); terom@160: terom@157: // store terom@157: lp->transport = transport; terom@157: lp->buf_len = buf_size; terom@157: lp->callbacks = *callbacks; terom@157: lp->cb_arg = cb_arg; terom@157: terom@160: // allocate buffers terom@19: if ( terom@160: (lp->in_buf = malloc(buf_size)) == NULL terom@19: || (lp->out_buf = malloc(buf_size)) == NULL terom@19: ) terom@19: JUMP_SET_ERROR(err, ERR_CALLOC); terom@8: terom@156: // setup the transport terom@156: transport_set_callbacks(transport, &line_proto_transport_callbacks, lp); terom@156: terom@156: if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE))) terom@156: goto error; terom@8: terom@8: // return terom@8: *lp_ptr = lp; terom@8: terom@8: return SUCCESS; terom@19: terom@19: error: terom@28: // cleanup the lp terom@160: line_proto_destroy(lp); terom@20: terom@20: return ERROR_CODE(err); terom@8: } terom@8: terom@8: /* terom@8: * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be terom@8: * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned terom@8: * (which is never a valid next-line offset). terom@8: * terom@8: * The given \a hint is an hint as to the offset at which to start scanning, used for incremental invocations of this terom@8: * on the same buffer. terom@8: * terom@8: */ terom@8: int _parse_line (char *buf, size_t len, size_t *hint) { terom@27: size_t i, next = 0; terom@8: terom@8: // empty buffer -> nothing terom@8: if (len == 0) terom@8: return 0; terom@8: terom@13: // look for terminating '\r\n' or '\n' sequence terom@13: for (i = *hint; i < len; i++) { terom@13: // match this + next char? terom@13: if (i < len - 1 && buf[i] == '\r' && buf[i + 1] == '\n') { terom@13: next = i + 2; terom@8: break; terom@13: terom@13: } else if (buf[i] == '\n') { terom@13: next = i + 1; terom@13: break; terom@13: } terom@8: } terom@8: terom@13: // searched the whole buffer? terom@13: if (i >= len) { terom@13: // do continue one char back, to keep any \r terom@8: *hint = len - 1; terom@8: return 0; terom@8: } terom@8: terom@8: // mangle the newline off terom@8: buf[i] = '\0'; terom@8: terom@13: // return offset to next line, as set in loop based on delim terom@13: return next; terom@8: } terom@8: terom@19: err_t line_proto_recv (struct line_proto *lp, char **line_ptr) terom@8: { terom@8: // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line terom@8: size_t recv_offset = 0, peek_offset = 0, next_offset = 0; terom@8: int ret; terom@8: terom@12: // adjust offset to beyond previous data (as will be moved next) terom@8: recv_offset = lp->tail_len; terom@8: terom@8: // move trailing data from previous line to front of buffer terom@8: if (lp->tail_offset) { terom@41: // move to front, no-op if tail_len is zero terom@19: memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len); terom@8: terom@8: // reset terom@8: lp->tail_offset = 0; terom@8: lp->tail_len = 0; terom@8: } terom@8: terom@8: // readline loop terom@8: do { terom@8: // parse any line at the beginning of the buffer terom@19: if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) { terom@10: // store a valid *line_ptr terom@19: *line_ptr = lp->in_buf; terom@10: terom@10: // exit loop and return terom@8: break; terom@10: } terom@8: terom@10: // ensure there's enough space for the rest of the line terom@19: if (recv_offset >= lp->buf_len) terom@19: return ERR_LINE_TOO_LONG; terom@8: terom@8: // otherwise, read more data terom@156: if ((ret = transport_read(lp->transport, lp->in_buf + recv_offset, lp->buf_len - recv_offset, &lp->err)) < 0) terom@156: return ERROR_CODE(&lp->err); terom@10: terom@12: // EAGAIN? terom@12: if (ret == 0) { terom@12: // return a NULL *line_ptr terom@12: *line_ptr = NULL; terom@12: break; terom@10: } terom@8: terom@8: // update recv_offset terom@8: recv_offset += ret; terom@8: terom@8: } while (1); terom@8: terom@8: // update state for next call terom@8: lp->tail_offset = next_offset; terom@8: lp->tail_len = recv_offset - next_offset; terom@8: terom@8: // ok terom@8: return SUCCESS; terom@8: } terom@8: terom@19: int line_proto_send (struct line_proto *lp, const char *line) terom@18: { terom@18: int ret; terom@28: size_t len = strlen(line), ret_len; terom@18: terom@19: // drop line if we already have output buffered terom@19: if (lp->out_offset) terom@21: return -ERR_LINE_TOO_LONG; terom@19: terom@19: // try and write the line terom@156: if ((ret = transport_write(lp->transport, line, len, &lp->err)) < 0) terom@19: return -ERROR_CODE(&lp->err); terom@18: terom@28: // length of the sent data terom@28: ret_len = ret; terom@28: terom@18: // EAGAIN or partial? terom@28: if (ret_len < len) { terom@28: size_t trailing = len - ret_len; terom@19: terom@19: // ensure it's not waaaay too long terom@19: if (trailing > lp->buf_len) terom@19: return -ERR_LINE_TOO_LONG; terom@19: terom@19: // copy remaining portion to buffer terom@28: memcpy(lp->out_buf, line + ret_len, trailing); terom@19: terom@19: // update offset terom@19: lp->out_offset = trailing; terom@19: terom@156: // buffered... transport should invoke on_write itself terom@19: return 1; terom@19: terom@19: } else { terom@19: // ok, no buffering needed terom@19: return SUCCESS; terom@19: terom@19: } terom@19: } terom@19: terom@19: int line_proto_flush (struct line_proto *lp) terom@19: { terom@19: int ret; terom@28: size_t ret_len; terom@19: terom@160: assert(lp->out_offset); terom@160: terom@19: // try and write the line terom@156: if ((ret = transport_write(lp->transport, lp->out_buf, lp->out_offset, &lp->err)) < 0) terom@19: return -ERROR_CODE(&lp->err); terom@18: terom@28: ret_len = ret; terom@28: terom@19: // empty now? terom@28: if (ret_len == lp->out_offset) { terom@19: lp->out_offset = 0; terom@19: terom@19: return SUCCESS; terom@19: } terom@19: terom@19: // partial? terom@28: if (ret_len > 0) { terom@28: size_t remaining = lp->out_offset - ret_len; terom@19: terom@19: // move the rest up terom@28: memmove(lp->out_buf, lp->out_buf + ret_len, remaining); terom@19: terom@19: // update offset terom@19: lp->out_offset = remaining; terom@19: } terom@19: terom@18: // ok terom@19: return 1; terom@18: } terom@18: terom@8: const struct error_info* line_proto_error (struct line_proto *lp) terom@8: { terom@8: // return pointer terom@8: return &lp->err; terom@8: } terom@19: terom@156: void line_proto_destroy (struct line_proto *lp) terom@28: { terom@28: // free buffers terom@28: free(lp->in_buf); terom@28: free(lp->out_buf); terom@28: terom@28: // socket? terom@156: if (lp->transport) terom@156: transport_destroy(lp->transport); terom@28: terom@28: // free the state itself terom@28: free(lp); terom@28: } terom@28: