terom@196: #include "msg_proto.h" terom@196: terom@196: #include terom@196: #include terom@196: #include terom@196: terom@196: /** terom@196: * I/O buffer terom@196: */ terom@196: struct msg_buf { terom@196: /** Buffer base pointer */ terom@196: char *base; terom@196: terom@196: /** Size of the buffer */ terom@196: size_t size; terom@196: terom@196: /** Current read/write offset */ terom@196: size_t off; terom@196: }; terom@196: terom@196: /** terom@196: * The minimum size used for any msg_buf::size related operation. terom@196: */ terom@196: #define MSG_BUF_MIN_SIZE 1024 terom@196: terom@196: /** terom@196: * Growth rate for size terom@196: */ terom@196: #define MSG_BUF_GROW_RATE 2 terom@196: terom@196: /** terom@196: * Initialize a message buffer at the given initial size terom@196: */ terom@196: err_t msg_buf_init (struct msg_buf *buf, size_t hint) terom@196: { terom@196: // apply minimum size terom@196: if (hint < MSG_BUF_MIN_SIZE) terom@196: hint = MSG_BUF_MIN_SIZE; terom@196: terom@196: // allocate the initial buffer terom@196: if ((buf->base = malloc(hint)) == NULL) terom@196: return ERR_MEM; terom@196: terom@196: // set fields terom@196: buf->size = hint; terom@196: buf->off = 0; terom@196: terom@196: // ok terom@196: return SUCCESS; terom@196: } terom@196: terom@196: /** terom@196: * Grow the buffer if needed to fit the given capacity. terom@196: */ terom@196: err_t msg_buf_grow (struct msg_buf *buf, size_t size) terom@196: { terom@196: char *tmp = buf->base; terom@196: terom@196: if (buf->size >= size) terom@196: // nothing to do terom@196: return SUCCESS; terom@196: terom@196: // calculate new size terom@196: while (buf->size < size) terom@196: buf->size *= MSG_BUF_GROW_RATE; terom@196: terom@196: // resize terom@196: if ((buf->base = realloc(buf->base, buf->size)) == NULL) { terom@196: buf->base = tmp; terom@196: terom@196: return ERR_MEM; terom@196: } terom@196: terom@196: // ok terom@196: return SUCCESS; terom@196: } terom@196: terom@196: /** terom@196: * Drain \a len bytes off the head of the buffer terom@196: */ terom@196: err_t msg_buf_drain (struct msg_buf *buf, size_t len) terom@196: { terom@196: // simple memmove terom@196: memmove(buf->base, buf->base + len, buf->off - len); terom@196: terom@196: // update offfset terom@196: buf->off -= len; terom@196: terom@196: // ok terom@196: return SUCCESS; terom@196: } terom@196: terom@196: /** terom@196: * Read into the buffer from a transport_t. terom@196: * terom@196: * This will attempt to read \a len bytes onto the end of the buffer, growing it if needed to fit. terom@196: * terom@196: * This may read/return more data than the given len. Use msg_buf_drain the remove the data from the buffer once you terom@196: * have used it. terom@196: * terom@196: * Returns the number of new bytes read, zero for transport read buffer empty, -err_t for error. terom@196: */ terom@196: ssize_t msg_buf_read (struct msg_buf *buf, transport_t *transport, size_t len, error_t *err) terom@196: { terom@196: ssize_t ret; terom@196: terom@196: // clamp size terom@196: if (len < MSG_BUF_MIN_SIZE) terom@196: len = MSG_BUF_MIN_SIZE; terom@196: terom@196: // ensure space terom@196: if ((ERROR_CODE(err) = msg_buf_grow(buf, buf->off + len))) terom@196: goto error; terom@196: terom@196: // read terom@196: if ((ret = transport_read(transport, buf->base + buf->off, len, err)) < 0) terom@196: goto error; terom@196: terom@196: // no data left? terom@196: if (!ret) terom@196: return 0; terom@196: terom@196: // update offset terom@196: buf->off += ret; terom@196: terom@196: // ok terom@196: return ret; terom@196: terom@196: error: terom@196: return -ERROR_CODE(err); terom@196: } terom@196: terom@196: /** terom@196: * Drives transport_write on the given data until all the given data is written, or zero is returned. terom@196: * terom@196: * @param transport transport to write to terom@196: * @param data input data terom@196: * @param len number of bytes to write from data terom@196: * @param err returned error info terom@196: * @return number of bytes written (which may be zero or less than len), or -err_t. terom@196: */ terom@196: static ssize_t _transport_write_all (transport_t *transport, const char *data, size_t len, error_t *err) terom@196: { terom@196: ssize_t ret; terom@196: size_t written = 0; terom@196: terom@196: while (len) { terom@196: // try and write out remaining data terom@196: if ((ret = transport_write(transport, data, len, err)) < 0) terom@196: goto error; terom@196: terom@196: if (!ret) { terom@196: // write buffer full terom@196: break; terom@196: terom@196: } else { terom@196: // update and continue terom@196: written += ret; terom@196: data += ret; terom@196: len -= ret; terom@196: } terom@196: } terom@196: terom@196: // ok terom@196: return written; terom@196: terom@196: error: terom@196: return -ERROR_CODE(err); terom@196: } terom@196: terom@196: /** terom@196: * If the buffer is empty, this will attempt to write the given data directly using transport_write until either all terom@196: * the data is written (in which case nothing more needs to be done), or the transport won't accept any more writes, terom@196: * in which case the remaining data will be buffered. terom@196: * terom@196: * If the buffer is not empty, then the given data will be added to the end of the buffer, since otherwise the order of terom@196: * data would be broken. terom@196: * terom@196: * In either case, transport_write semantics garuntee that our buffer will either be empty, or an on_write will be terom@196: * pending on the transport. See msg_buf_flush() for how to handle transport_callbacks::on_write. terom@196: */ terom@196: err_t msg_buf_write (struct msg_buf *buf, transport_t *transport, const void *data_ptr, size_t len, error_t *err) terom@196: { terom@196: ssize_t ret; terom@196: const char *data = data_ptr; terom@196: terom@196: if (!buf->off) { terom@196: // no data buffered, so we can try and write directly terom@196: if ((ret = _transport_write_all(transport, data, len, err)) < 0) terom@196: goto error; terom@196: terom@196: // update written terom@196: data += ret; terom@196: len -= ret; terom@196: terom@196: if (len == 0) terom@196: // wrote it all terom@196: return SUCCESS; terom@196: } terom@196: terom@196: // ensure space terom@196: if ((ERROR_CODE(err) = msg_buf_grow(buf, buf->off + len))) terom@196: goto error; terom@196: terom@196: // store terom@196: memcpy(buf->base + buf->off, data, len); terom@196: terom@196: // update terom@196: buf->off += len; terom@196: terom@196: // ok terom@196: return SUCCESS; terom@196: terom@196: error: terom@196: return ERROR_CODE(err); terom@196: } terom@196: terom@196: /** terom@196: * Flush buffered write data to the transport, driving transport_write() until either all of our bufferd data has been terom@196: * written, or the transport will not accept any more. terom@196: * terom@196: * In either case, transport_write semantics garuntee that our buffer will either be empty, or an on_write will be terom@196: * pending on the transport. terom@196: */ terom@196: err_t msg_buf_flush (struct msg_buf *buf, transport_t *transport, error_t *err) terom@196: { terom@196: ssize_t ret; terom@196: terom@196: // write terom@196: if ((ret = _transport_write_all(transport, buf->base, buf->off, err)) < 0) terom@196: goto error; terom@196: terom@196: if (ret) terom@196: // unbuffer the written data terom@196: msg_buf_drain(buf, ret); terom@196: terom@196: // ok terom@196: return SUCCESS; terom@196: terom@196: error: terom@196: return ERROR_CODE(err); terom@196: } terom@196: terom@196: /** terom@196: * Deinitialize msg_buf to release allocated buffers terom@196: */ terom@196: void msg_buf_deinit (struct msg_buf *buf) terom@196: { terom@196: // release terom@196: free(buf->base); terom@196: terom@196: // reset terom@196: buf->base = NULL; terom@196: buf->size = buf->off = 0; terom@196: } terom@196: terom@196: /** terom@196: * Message header terom@196: */ terom@196: struct msg_header { terom@196: /** Message length, including header */ terom@196: uint16_t len; terom@196: }; terom@196: terom@196: /** terom@196: * Message header size terom@196: */ terom@196: #define MSG_PROTO_HEADER_SIZE (sizeof(uint16_t)) terom@196: terom@196: /** terom@196: * Our state struct terom@196: */ terom@196: struct msg_proto { terom@196: /** The transport */ terom@196: transport_t *transport; terom@196: terom@196: /** User callbacks */ terom@196: const struct msg_proto_callbacks *cb_tbl; terom@196: terom@196: /** User callback argument */ terom@196: void *cb_arg; terom@196: terom@196: /** Input buffer */ terom@196: struct msg_buf in; terom@196: terom@196: /** Output buffer */ terom@196: struct msg_buf out; terom@196: }; terom@196: terom@196: /** terom@196: * Signal error to user terom@196: */ terom@196: static void msg_proto_error (struct msg_proto *proto, const error_t *err) terom@196: { terom@196: // invoke user callback terom@196: proto->cb_tbl->on_error(proto, err, proto->cb_arg); terom@196: } terom@196: terom@196: /** terom@196: * Attempt to read the current header from our input buffer. terom@196: * terom@196: * Returns >0 for full header, 0 for incomplete header, -err_t for error. terom@196: */ terom@196: static int msg_proto_peek_header (struct msg_proto *proto, struct msg_header *header, error_t *err) terom@196: { terom@196: if (proto->in.off < MSG_PROTO_HEADER_SIZE) terom@196: // not enough data for header terom@196: return 0; terom@196: terom@196: // read header terom@196: header->len = ntohs(*((uint16_t *) proto->in.base)); terom@196: terom@196: // bad header? terom@196: if (header->len < MSG_PROTO_HEADER_SIZE) terom@196: JUMP_SET_ERROR_STR(err, ERR_MISC, "message_header::len"); terom@196: terom@196: // ok, got header terom@196: return 1; terom@196: terom@196: error: terom@196: return -ERROR_CODE(err); terom@196: } terom@196: terom@196: /** terom@196: * Recieved a message with the given header, and a pointer to the message data terom@196: * terom@196: * XXX: what to do if the user callback destroys the msg_proto? terom@196: */ terom@196: static err_t msg_proto_on_msg (struct msg_proto *proto, struct msg_header *header, char *data, error_t *err) terom@196: { terom@196: (void) err; terom@196: terom@196: // invoke user callback terom@196: proto->cb_tbl->on_msg(proto, data, header->len - MSG_PROTO_HEADER_SIZE, proto->cb_arg); terom@196: terom@196: // XXX: handle user errors terom@196: return SUCCESS; terom@196: } terom@196: terom@196: static void msg_proto_on_read (transport_t *transport, void *arg) terom@196: { terom@196: struct msg_proto *proto = arg; terom@196: struct msg_header header; terom@196: ssize_t ret; terom@196: error_t err; terom@196: terom@196: // we might be able to read more than one message per event terom@196: do { terom@196: // try and read message length for incomplete message terom@196: if ((ret = msg_proto_peek_header(proto, &header, &err)) < 0) terom@196: goto error; terom@196: terom@196: // need to read more data? terom@196: if (!ret || header.len > proto->in.off) { terom@196: // msg_buf_read a minimum size, so passing a zero is OK terom@196: size_t to_read = ret ? header.len : 0; terom@196: terom@196: // read into our buffer terom@196: if ((ret = msg_buf_read(&proto->in, transport, to_read, &err)) < 0) terom@196: goto error; terom@196: terom@196: } else { terom@196: // handle full message terom@196: if (msg_proto_on_msg(proto, &header, proto->in.base + MSG_PROTO_HEADER_SIZE, &err)) terom@196: goto error; terom@196: terom@196: // remove the data from the buffer terom@196: msg_buf_drain(&proto->in, header.len); terom@196: } terom@196: } while (ret); terom@196: terom@196: // ok terom@196: return; terom@196: terom@196: error: terom@196: // notify user terom@196: msg_proto_error(proto, &err); terom@196: } terom@196: terom@196: static void msg_proto_on_write (transport_t *transport, void *arg) terom@196: { terom@196: struct msg_proto *proto = arg; terom@196: error_t err; terom@196: terom@196: // flush terom@196: if (msg_buf_flush(&proto->out, transport, &err)) terom@196: // notify user on transport errors terom@196: msg_proto_error(proto, &err); terom@196: } terom@196: terom@196: static void msg_proto_on_error (transport_t *transport, const error_t *err, void *arg) terom@196: { terom@196: struct msg_proto *proto = arg; terom@196: terom@196: (void) transport; terom@196: terom@196: // report to user terom@196: msg_proto_error(proto, err); terom@196: } terom@196: terom@196: static const struct transport_callbacks msg_proto_transport_callbacks = { terom@196: .on_read = msg_proto_on_read, terom@196: .on_write = msg_proto_on_write, terom@196: .on_error = msg_proto_on_error, terom@196: }; terom@196: terom@196: err_t msg_proto_create (struct msg_proto **proto_ptr, transport_t *transport, const struct msg_proto_callbacks *cb_tbl, void *cb_arg, error_t *err) terom@196: { terom@196: struct msg_proto *proto; terom@196: terom@196: // alloc terom@196: if ((proto = calloc(1, sizeof(*proto))) == NULL) terom@196: return ERR_MEM; terom@196: terom@196: // store terom@196: proto->transport = transport; terom@196: proto->cb_tbl = cb_tbl; terom@196: proto->cb_arg = cb_arg; terom@196: terom@196: // init terom@196: if ( terom@196: (ERROR_CODE(err) = msg_buf_init(&proto->in, 0)) terom@196: || (ERROR_CODE(err) = msg_buf_init(&proto->out, 0)) terom@196: ) terom@196: goto error; terom@196: terom@196: // setup transport terom@196: if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE))) terom@196: goto error; terom@196: terom@196: transport_set_callbacks(transport, &msg_proto_transport_callbacks, proto); terom@196: terom@196: // ok terom@196: *proto_ptr = proto; terom@196: terom@196: return SUCCESS; terom@196: terom@196: error: terom@196: // release terom@196: msg_proto_destroy(proto); terom@196: terom@196: return ERROR_CODE(err); terom@196: } terom@196: terom@196: /** terom@196: * Build and write out the data for the given header terom@196: */ terom@196: static err_t msg_proto_write_header (struct msg_proto *proto, const struct msg_header *header, error_t *err) terom@196: { terom@196: char buf[MSG_PROTO_HEADER_SIZE]; terom@196: terom@196: // validate terom@196: if (header->len < MSG_PROTO_HEADER_SIZE) terom@196: return SET_ERROR(err, ERR_MISC); terom@196: terom@196: // build terom@196: *((uint16_t *) buf) = htons(header->len); terom@196: terom@196: // write terom@196: return msg_buf_write(&proto->out, proto->transport, buf, sizeof(buf), err); terom@196: } terom@196: terom@196: err_t msg_proto_send (struct msg_proto *proto, const void *data, size_t len, error_t *err) terom@196: { terom@196: struct msg_header header; terom@196: terom@196: // build header terom@196: header.len = MSG_PROTO_HEADER_SIZE + len; terom@196: terom@196: // write it terom@196: if ( terom@196: msg_proto_write_header(proto, &header, err) terom@196: || msg_buf_write(&proto->out, proto->transport, data, len, err) terom@196: ) terom@196: return ERROR_CODE(err); terom@196: terom@196: // ok terom@196: return SUCCESS; terom@196: } terom@196: terom@196: void msg_proto_destroy (struct msg_proto *proto) terom@196: { terom@196: // drop buffers terom@196: msg_buf_deinit(&proto->in); terom@196: msg_buf_deinit(&proto->out); terom@196: terom@196: // kill transport terom@196: transport_destroy(proto->transport); terom@196: terom@196: // release ourself terom@196: free(proto); terom@196: } terom@196: