src/msg_proto.c
branchnew-lib-errors
changeset 219 cefec18b8268
parent 218 5229a5d098b2
equal deleted inserted replaced
218:5229a5d098b2 219:cefec18b8268
     1 #include "msg_proto.h"
       
     2 
       
     3 #include <string.h>
       
     4 #include <stdint.h>
       
     5 #include <arpa/inet.h>
       
     6 
       
     7 /**
       
     8  * I/O buffer
       
     9  */
       
    10 struct msg_buf {
       
    11     /** Buffer base pointer */
       
    12     char *base;
       
    13 
       
    14     /** Size of the buffer */
       
    15     size_t size;
       
    16 
       
    17     /** Current read/write offset */
       
    18     size_t off;
       
    19 };
       
    20 
       
    21 /**
       
    22  * The minimum size used for any msg_buf::size related operation.
       
    23  */
       
    24 #define MSG_BUF_MIN_SIZE 1024
       
    25 
       
    26 /**
       
    27  * Growth rate for size
       
    28  */
       
    29 #define MSG_BUF_GROW_RATE 2
       
    30 
       
    31 /**
       
    32  * Initialize a message buffer at the given initial size
       
    33  */
       
    34 err_t msg_buf_init (struct msg_buf *buf, size_t hint)
       
    35 {
       
    36     // apply minimum size
       
    37     if (hint < MSG_BUF_MIN_SIZE)
       
    38         hint = MSG_BUF_MIN_SIZE;
       
    39 
       
    40     // allocate the initial buffer
       
    41     if ((buf->base = malloc(hint)) == NULL)
       
    42         return ERR_MEM;
       
    43     
       
    44     // set fields
       
    45     buf->size = hint;
       
    46     buf->off = 0;
       
    47 
       
    48     // ok
       
    49     return SUCCESS;
       
    50 }
       
    51 
       
    52 /**
       
    53  * Grow the buffer if needed to fit the given capacity.
       
    54  */
       
    55 err_t msg_buf_grow (struct msg_buf *buf, size_t size)
       
    56 {
       
    57     char *tmp = buf->base;
       
    58 
       
    59     if (buf->size >= size)
       
    60         // nothing to do
       
    61         return SUCCESS;
       
    62 
       
    63     // calculate new size
       
    64     while (buf->size < size)
       
    65         buf->size *= MSG_BUF_GROW_RATE;
       
    66 
       
    67     // resize
       
    68     if ((buf->base = realloc(buf->base, buf->size)) == NULL) {
       
    69         buf->base = tmp;
       
    70 
       
    71         return ERR_MEM;
       
    72     }
       
    73 
       
    74     // ok
       
    75     return SUCCESS;
       
    76 }
       
    77 
       
    78 /**
       
    79  * Drain \a len bytes off the head of the buffer
       
    80  */
       
    81 err_t msg_buf_drain (struct msg_buf *buf, size_t len)
       
    82 {
       
    83     // simple memmove
       
    84     memmove(buf->base, buf->base + len, buf->off - len);
       
    85 
       
    86     // update offfset
       
    87     buf->off -= len;
       
    88     
       
    89     // ok
       
    90     return SUCCESS;
       
    91 }
       
    92 
       
    93 /**
       
    94  * Read into the buffer from a transport_t.
       
    95  *
       
    96  * This will attempt to read \a len bytes onto the end of the buffer, growing it if needed to fit.
       
    97  *
       
    98  * This may read/return more data than the given len. Use msg_buf_drain the remove the data from the buffer once you
       
    99  * have used it.
       
   100  *
       
   101  * Returns the number of new bytes read, zero for transport read buffer empty, -err_t for error.
       
   102  */
       
   103 ssize_t msg_buf_read (struct msg_buf *buf, transport_t *transport, size_t len, error_t *err)
       
   104 {
       
   105     ssize_t ret;
       
   106 
       
   107     // clamp size
       
   108     if (len < MSG_BUF_MIN_SIZE)
       
   109         len = MSG_BUF_MIN_SIZE;
       
   110 
       
   111     // ensure space
       
   112     if ((ERROR_CODE(err) = msg_buf_grow(buf, buf->off + len)))
       
   113         goto error;
       
   114 
       
   115     // read
       
   116     if ((ret = transport_read(transport, buf->base + buf->off, len, err)) < 0)
       
   117         goto error;
       
   118     
       
   119     // no data left?
       
   120     if (!ret)
       
   121         return 0;
       
   122 
       
   123     // update offset
       
   124     buf->off += ret;
       
   125 
       
   126     // ok
       
   127     return ret;
       
   128 
       
   129 error:
       
   130     return -ERROR_CODE(err);    
       
   131 }
       
   132 
       
   133 /**
       
   134  * Drives transport_write on the given data until all the given data is written, or zero is returned.
       
   135  *
       
   136  * @param transport transport to write to
       
   137  * @param data input data
       
   138  * @param len number of bytes to write from data
       
   139  * @param err returned error info
       
   140  * @return number of bytes written (which may be zero or less than len), or -err_t.
       
   141  */
       
   142 static ssize_t _transport_write_all (transport_t *transport, const char *data, size_t len, error_t *err)
       
   143 {
       
   144     ssize_t ret;
       
   145     size_t written = 0;
       
   146 
       
   147     while (len) {
       
   148         // try and write out remaining data
       
   149         if ((ret = transport_write(transport, data, len, err)) < 0)
       
   150             goto error;
       
   151         
       
   152         if (!ret) {
       
   153             // write buffer full
       
   154             break;
       
   155 
       
   156         } else { 
       
   157             // update and continue
       
   158             written += ret;
       
   159             data += ret;
       
   160             len -= ret;
       
   161         }
       
   162     }
       
   163 
       
   164     // ok
       
   165     return written;
       
   166 
       
   167 error:
       
   168     return -ERROR_CODE(err);    
       
   169 }
       
   170 
       
   171 /**
       
   172  * If the buffer is empty, this will attempt to write the given data directly using transport_write until either all
       
   173  * the data is written (in which case nothing more needs to be done), or the transport won't accept any more writes,
       
   174  * in which case the remaining data will be buffered.
       
   175  *
       
   176  * If the buffer is not empty, then the given data will be added to the end of the buffer, since otherwise the order of
       
   177  * data would be broken.
       
   178  *
       
   179  * In either case, transport_write semantics garuntee that our buffer will either be empty, or an on_write will be
       
   180  * pending on the transport. See msg_buf_flush() for how to handle transport_callbacks::on_write.
       
   181  */
       
   182 err_t msg_buf_write (struct msg_buf *buf, transport_t *transport, const void *data_ptr, size_t len, error_t *err)
       
   183 {
       
   184     ssize_t ret;
       
   185     const char *data = data_ptr;
       
   186 
       
   187     if (!buf->off) {
       
   188         // no data buffered, so we can try and write directly
       
   189         if ((ret = _transport_write_all(transport, data, len, err)) < 0)
       
   190             goto error;
       
   191     
       
   192         // update written
       
   193         data += ret;
       
   194         len -= ret;
       
   195         
       
   196         if (len == 0)
       
   197             // wrote it all
       
   198             return SUCCESS;
       
   199     }
       
   200 
       
   201     // ensure space
       
   202     if ((ERROR_CODE(err) = msg_buf_grow(buf, buf->off + len)))
       
   203         goto error;
       
   204 
       
   205     // store
       
   206     memcpy(buf->base + buf->off, data, len);
       
   207     
       
   208     // update
       
   209     buf->off += len;
       
   210 
       
   211     // ok
       
   212     return SUCCESS;
       
   213 
       
   214 error:
       
   215     return ERROR_CODE(err);    
       
   216 }
       
   217 
       
   218 /**
       
   219  * Flush buffered write data to the transport, driving transport_write() until either all of our bufferd data has been
       
   220  * written, or the transport will not accept any more.
       
   221  *
       
   222  * In either case, transport_write semantics garuntee that our buffer will either be empty, or an on_write will be
       
   223  * pending on the transport.
       
   224  */
       
   225 err_t msg_buf_flush (struct msg_buf *buf, transport_t *transport, error_t *err)
       
   226 {
       
   227     ssize_t ret;
       
   228 
       
   229     // write
       
   230     if ((ret = _transport_write_all(transport, buf->base, buf->off, err)) < 0)
       
   231         goto error;
       
   232     
       
   233     if (ret)
       
   234         // unbuffer the written data
       
   235         msg_buf_drain(buf, ret);
       
   236 
       
   237     // ok
       
   238     return SUCCESS;
       
   239 
       
   240 error:
       
   241     return ERROR_CODE(err);    
       
   242 }
       
   243 
       
   244 /**
       
   245  * Deinitialize msg_buf to release allocated buffers
       
   246  */
       
   247 void msg_buf_deinit (struct msg_buf *buf)
       
   248 {
       
   249     // release
       
   250     free(buf->base);
       
   251 
       
   252     // reset
       
   253     buf->base = NULL;
       
   254     buf->size = buf->off = 0;
       
   255 }
       
   256 
       
   257 /**
       
   258  * Message header
       
   259  */
       
   260 struct msg_header {
       
   261     /** Message length, including header */
       
   262     uint16_t len;
       
   263 };
       
   264 
       
   265 /**
       
   266  * Message header size
       
   267  */
       
   268 #define MSG_PROTO_HEADER_SIZE (sizeof(uint16_t))
       
   269 
       
   270 /**
       
   271  * Our state struct
       
   272  */
       
   273 struct msg_proto {
       
   274     /** The transport */
       
   275     transport_t *transport;
       
   276 
       
   277     /** User callbacks */
       
   278     const struct msg_proto_callbacks *cb_tbl;
       
   279 
       
   280     /** User callback argument */
       
   281     void *cb_arg;
       
   282 
       
   283     /** Input buffer */
       
   284     struct msg_buf in;
       
   285 
       
   286     /** Output buffer */
       
   287     struct msg_buf out;
       
   288 };
       
   289 
       
   290 /**
       
   291  * Signal error to user
       
   292  */
       
   293 static void msg_proto_error (struct msg_proto *proto, const error_t *err)
       
   294 {
       
   295     // invoke user callback
       
   296     proto->cb_tbl->on_error(proto, err, proto->cb_arg);
       
   297 }
       
   298 
       
   299 /**
       
   300  * Attempt to read the current header from our input buffer.
       
   301  *
       
   302  * Returns >0 for full header, 0 for incomplete header, -err_t for error.
       
   303  */
       
   304 static int msg_proto_peek_header (struct msg_proto *proto, struct msg_header *header, error_t *err)
       
   305 {
       
   306     if (proto->in.off < MSG_PROTO_HEADER_SIZE)
       
   307         // not enough data for header
       
   308         return 0;
       
   309 
       
   310     // read header
       
   311     header->len = ntohs(*((uint16_t *) proto->in.base));
       
   312 
       
   313     // bad header?
       
   314     if (header->len < MSG_PROTO_HEADER_SIZE)
       
   315         JUMP_SET_ERROR_STR(err, ERR_MISC, "message_header::len");
       
   316 
       
   317     // ok, got header
       
   318     return 1;
       
   319 
       
   320 error:
       
   321     return -ERROR_CODE(err);    
       
   322 }
       
   323 
       
   324 /**
       
   325  * Recieved a message with the given header, and a pointer to the message data
       
   326  *
       
   327  * XXX: what to do if the user callback destroys the msg_proto?
       
   328  */
       
   329 static err_t msg_proto_on_msg (struct msg_proto *proto, struct msg_header *header, char *data, error_t *err)
       
   330 {
       
   331     (void) err;
       
   332 
       
   333     // invoke user callback
       
   334     proto->cb_tbl->on_msg(proto, data, header->len - MSG_PROTO_HEADER_SIZE, proto->cb_arg);
       
   335 
       
   336     // XXX: handle user errors
       
   337     return SUCCESS;
       
   338 }
       
   339 
       
   340 static void msg_proto_on_read (transport_t *transport, void *arg)
       
   341 {
       
   342     struct msg_proto *proto = arg;
       
   343     struct msg_header header;
       
   344     ssize_t ret;
       
   345     error_t err;
       
   346     
       
   347     // we might be able to read more than one message per event
       
   348     do {
       
   349         // try and read message length for incomplete message
       
   350         if ((ret = msg_proto_peek_header(proto, &header, &err)) < 0)
       
   351             goto error;
       
   352         
       
   353         // need to read more data?
       
   354         if (!ret || header.len > proto->in.off) {
       
   355             // msg_buf_read a minimum size, so passing a zero is OK
       
   356             size_t to_read = ret ? header.len : 0;
       
   357 
       
   358             // read into our buffer
       
   359             if ((ret = msg_buf_read(&proto->in, transport, to_read, &err)) < 0)
       
   360                 goto error;
       
   361     
       
   362         } else {
       
   363             // handle full message
       
   364             if (msg_proto_on_msg(proto, &header, proto->in.base + MSG_PROTO_HEADER_SIZE, &err))
       
   365                 goto error;
       
   366             
       
   367             // remove the data from the buffer
       
   368             msg_buf_drain(&proto->in, header.len);
       
   369         }
       
   370     } while (ret);
       
   371     
       
   372     // ok
       
   373     return;
       
   374 
       
   375 error:
       
   376     // notify user
       
   377     msg_proto_error(proto, &err);    
       
   378 }
       
   379 
       
   380 static void msg_proto_on_write (transport_t *transport, void *arg)
       
   381 {
       
   382     struct msg_proto *proto = arg;
       
   383     error_t err;
       
   384 
       
   385     // flush
       
   386     if (msg_buf_flush(&proto->out, transport, &err))
       
   387         // notify user on transport errors
       
   388         msg_proto_error(proto, &err);
       
   389 }
       
   390 
       
   391 static void msg_proto_on_error (transport_t *transport, const error_t *err, void *arg)
       
   392 {
       
   393     struct msg_proto *proto = arg;
       
   394 
       
   395     (void) transport;
       
   396 
       
   397     // report to user
       
   398     msg_proto_error(proto, err);
       
   399 }
       
   400 
       
   401 static const struct transport_callbacks msg_proto_transport_callbacks = {
       
   402     .on_read    = msg_proto_on_read,
       
   403     .on_write   = msg_proto_on_write,
       
   404     .on_error   = msg_proto_on_error,
       
   405 };
       
   406 
       
   407 err_t msg_proto_create (struct msg_proto **proto_ptr, transport_t *transport, const struct msg_proto_callbacks *cb_tbl, void *cb_arg, error_t *err)
       
   408 {
       
   409     struct msg_proto *proto;
       
   410 
       
   411     // alloc
       
   412     if ((proto = calloc(1, sizeof(*proto))) == NULL)
       
   413         return ERR_MEM;
       
   414 
       
   415     // store
       
   416     proto->transport = transport;
       
   417     proto->cb_tbl = cb_tbl;
       
   418     proto->cb_arg = cb_arg;
       
   419 
       
   420     // init
       
   421     if (
       
   422             (ERROR_CODE(err) = msg_buf_init(&proto->in, 0))
       
   423         ||  (ERROR_CODE(err) = msg_buf_init(&proto->out, 0))
       
   424     )
       
   425         goto error;
       
   426 
       
   427     // setup transport
       
   428     if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE)))
       
   429         goto error;
       
   430 
       
   431     transport_set_callbacks(transport, &msg_proto_transport_callbacks, proto);
       
   432 
       
   433     // ok
       
   434     *proto_ptr = proto;
       
   435 
       
   436     return SUCCESS;
       
   437 
       
   438 error:
       
   439     // release
       
   440     msg_proto_destroy(proto);
       
   441 
       
   442     return ERROR_CODE(err);
       
   443 }
       
   444 
       
   445 /**
       
   446  * Build and write out the data for the given header
       
   447  */
       
   448 static err_t msg_proto_write_header (struct msg_proto *proto, const struct msg_header *header, error_t *err)
       
   449 {
       
   450     char buf[MSG_PROTO_HEADER_SIZE];
       
   451 
       
   452     // validate
       
   453     if (header->len < MSG_PROTO_HEADER_SIZE)
       
   454         return SET_ERROR(err, ERR_MISC);
       
   455 
       
   456     // build
       
   457     *((uint16_t *) buf) = htons(header->len);
       
   458 
       
   459     // write
       
   460     return msg_buf_write(&proto->out, proto->transport, buf, sizeof(buf), err);
       
   461 }
       
   462 
       
   463 err_t msg_proto_send (struct msg_proto *proto, const void *data, size_t len, error_t *err)
       
   464 {
       
   465     struct msg_header header;
       
   466 
       
   467     // build header
       
   468     header.len = MSG_PROTO_HEADER_SIZE + len;
       
   469 
       
   470     // write it
       
   471     if (
       
   472             msg_proto_write_header(proto, &header, err)
       
   473         ||  msg_buf_write(&proto->out, proto->transport, data, len, err)
       
   474     )
       
   475         return ERROR_CODE(err);
       
   476 
       
   477     // ok
       
   478     return SUCCESS;
       
   479 }
       
   480 
       
   481 void msg_proto_destroy (struct msg_proto *proto)
       
   482 {
       
   483     // drop buffers
       
   484     msg_buf_deinit(&proto->in);
       
   485     msg_buf_deinit(&proto->out);
       
   486 
       
   487     // kill transport
       
   488     transport_destroy(proto->transport);
       
   489 
       
   490     // release ourself
       
   491     free(proto);
       
   492 }
       
   493