src/lib/line_proto.c
branchnew-lib-errors
changeset 219 cefec18b8268
parent 168 a58ad50911fc
equal deleted inserted replaced
218:5229a5d098b2 219:cefec18b8268
       
     1 
       
     2 #include "line_proto.h"
       
     3 #include "log.h"
       
     4 
       
     5 #include <string.h>
       
     6 #include <stdlib.h>
       
     7 #include <assert.h>
       
     8 
       
     9 /*
       
    10  * Our state
       
    11  */
       
    12 struct line_proto {
       
    13     /* The transport we read/write with */
       
    14     transport_t *transport;
       
    15 
       
    16     /* The incoming/outgoing line buffer */
       
    17     char *in_buf, *out_buf;
       
    18 
       
    19     /* Buffer size (same for both) */
       
    20     size_t buf_len;
       
    21 
       
    22     /* Offset of trailing data in buf */
       
    23     size_t tail_offset;
       
    24 
       
    25     /* Length of trailing data in buf, if any */
       
    26     size_t tail_len;
       
    27 
       
    28     /* Amount of data in the out buffer */
       
    29     size_t out_offset;
       
    30 
       
    31     /* Last error */
       
    32     struct error_info err;
       
    33 
       
    34     /* Callback info */
       
    35     struct line_proto_callbacks callbacks;
       
    36     void *cb_arg;
       
    37 };
       
    38 
       
    39 /**
       
    40  * An error occured which we could not recover from; the line_proto should now be considered corrupt.
       
    41  *
       
    42  * Notify the user callback, which will probably call line_proto_release().
       
    43  */
       
    44 static void line_proto_set_error (struct line_proto *lp)
       
    45 {
       
    46     // copy error_info, as it might get free'd
       
    47     struct error_info err = lp->err;
       
    48 
       
    49     // trigger callback
       
    50     lp->callbacks.on_error(&err, lp->cb_arg);
       
    51 }
       
    52 
       
    53 /**
       
    54  * Our transport_callbacks::on_read handler
       
    55  */
       
    56 static void line_proto_on_read (transport_t *transport, void *arg)
       
    57 {
       
    58     struct line_proto *lp = arg;
       
    59     char *line;
       
    60 
       
    61     (void) transport;
       
    62 
       
    63     // sanity-check
       
    64     assert(lp->tail_offset < lp->buf_len);
       
    65     
       
    66     do {
       
    67         // attempt to read a line
       
    68         if (line_proto_recv(lp, &line))
       
    69             // faaail
       
    70             return line_proto_set_error(lp);
       
    71 
       
    72         // got a line?
       
    73         if (line)
       
    74             lp->callbacks.on_line(line, lp->cb_arg);
       
    75 
       
    76     } while (line);
       
    77 }
       
    78 
       
    79 /*
       
    80  * Signal for write
       
    81  */
       
    82 static void line_proto_on_write (transport_t *transport, void *arg)
       
    83 {
       
    84     struct line_proto *lp = arg;
       
    85     int ret;
       
    86 
       
    87     (void) transport;
       
    88 
       
    89     // just flush
       
    90     if ((ret = line_proto_flush(lp)) < 0)
       
    91         // faaail
       
    92         return line_proto_set_error(lp);
       
    93 }
       
    94 
       
    95 // XXX: implement on_error!
       
    96 static const struct transport_callbacks line_proto_transport_callbacks = {
       
    97     .on_read    = &line_proto_on_read,
       
    98     .on_write   = &line_proto_on_write,
       
    99 };
       
   100 
       
   101 err_t line_proto_create (struct line_proto **lp_ptr, transport_t *transport, size_t buf_size,
       
   102         const struct line_proto_callbacks *callbacks, void *cb_arg, error_t *err)
       
   103 {
       
   104     struct line_proto *lp;
       
   105 
       
   106     // alloc
       
   107     if ((lp = calloc(1, sizeof(*lp))) == NULL)
       
   108         return SET_ERROR(err, ERR_CALLOC);
       
   109 
       
   110     // store
       
   111     lp->transport = transport;
       
   112     lp->buf_len = buf_size;
       
   113     lp->callbacks = *callbacks;
       
   114     lp->cb_arg = cb_arg;
       
   115 
       
   116     // allocate buffers
       
   117     if (
       
   118             (lp->in_buf = malloc(buf_size)) == NULL
       
   119         ||  (lp->out_buf = malloc(buf_size)) == NULL
       
   120     )
       
   121         JUMP_SET_ERROR(err, ERR_CALLOC);
       
   122 
       
   123     // setup the transport
       
   124     transport_set_callbacks(transport, &line_proto_transport_callbacks, lp);
       
   125     
       
   126     if ((ERROR_CODE(err) = transport_events(transport, TRANSPORT_READ | TRANSPORT_WRITE)))
       
   127         goto error;
       
   128 
       
   129     // return
       
   130     *lp_ptr = lp;
       
   131 
       
   132     return SUCCESS;
       
   133 
       
   134 error:
       
   135     // cleanup the lp
       
   136     line_proto_destroy(lp);
       
   137 
       
   138     return ERROR_CODE(err);
       
   139 }
       
   140 
       
   141 /*
       
   142  * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be
       
   143  * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned
       
   144  * (which is never a valid next-line offset).
       
   145  *
       
   146  * The given \a hint is an hint as to the offset at which to start scanning, used for incremental invocations of this
       
   147  * on the same buffer.
       
   148  *
       
   149  */
       
   150 int _parse_line (char *buf, size_t len, size_t *hint) {
       
   151     size_t i, next = 0;
       
   152 
       
   153     // empty buffer -> nothing
       
   154     if (len == 0)
       
   155         return 0;
       
   156 
       
   157     // look for terminating '\r\n' or '\n' sequence
       
   158     for (i = *hint; i < len; i++) {
       
   159         // match this + next char?
       
   160         if (i < len - 1 && buf[i] == '\r' && buf[i + 1] == '\n') {
       
   161             next = i + 2;
       
   162             break;
       
   163 
       
   164         } else if (buf[i] == '\n') {
       
   165             next = i + 1;
       
   166             break;
       
   167         }
       
   168     }
       
   169 
       
   170     // searched the whole buffer?
       
   171     if (i >= len) {
       
   172         // do continue one char back, to keep any \r
       
   173         *hint = len - 1;
       
   174         return 0;
       
   175     }
       
   176 
       
   177     // mangle the newline off
       
   178     buf[i] = '\0';
       
   179 
       
   180     // return offset to next line, as set in loop based on delim
       
   181     return next;
       
   182 }
       
   183 
       
   184 err_t line_proto_recv (struct line_proto *lp, char **line_ptr)
       
   185 {
       
   186     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
       
   187     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
       
   188     int ret;
       
   189 
       
   190     // adjust offset to beyond previous data (as will be moved next)
       
   191     recv_offset = lp->tail_len;
       
   192 
       
   193     // move trailing data from previous line to front of buffer
       
   194     if (lp->tail_offset) {
       
   195         // move to front, no-op if tail_len is zero
       
   196         memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len);
       
   197 
       
   198         // reset
       
   199         lp->tail_offset = 0;
       
   200         lp->tail_len = 0;
       
   201     }
       
   202 
       
   203     // readline loop
       
   204     do {
       
   205         // parse any line at the beginning of the buffer
       
   206         if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) {
       
   207             // store a valid *line_ptr
       
   208             *line_ptr = lp->in_buf;
       
   209             
       
   210             // exit loop and return
       
   211             break;
       
   212         }
       
   213 
       
   214         // ensure there's enough space for the rest of the line
       
   215         if (recv_offset >= lp->buf_len)
       
   216             return ERR_LINE_TOO_LONG;
       
   217         
       
   218         // otherwise, read more data
       
   219         if ((ret = transport_read(lp->transport, lp->in_buf + recv_offset, lp->buf_len - recv_offset, &lp->err)) < 0)
       
   220             return ERROR_CODE(&lp->err);
       
   221 
       
   222         // EAGAIN?
       
   223         if (ret == 0) {
       
   224             // return a NULL *line_ptr
       
   225             *line_ptr = NULL;
       
   226             break;
       
   227         }
       
   228         
       
   229         // update recv_offset
       
   230         recv_offset += ret;
       
   231 
       
   232     } while (1);
       
   233 
       
   234     // update state for next call
       
   235     lp->tail_offset = next_offset;
       
   236     lp->tail_len = recv_offset - next_offset;
       
   237 
       
   238     // ok
       
   239     return SUCCESS;
       
   240 }
       
   241 
       
   242 int line_proto_send (struct line_proto *lp, const char *line)
       
   243 {
       
   244     int ret;
       
   245     size_t len = strlen(line), ret_len;
       
   246 
       
   247     // drop line if we already have output buffered
       
   248     if (lp->out_offset)
       
   249         return -ERR_LINE_TOO_LONG;
       
   250     
       
   251     // try and write the line
       
   252     if ((ret = transport_write(lp->transport, line, len, &lp->err)) < 0)
       
   253         return -ERROR_CODE(&lp->err);
       
   254 
       
   255     // length of the sent data
       
   256     ret_len = ret;
       
   257 
       
   258     // EAGAIN or partial?
       
   259     if (ret_len < len) {
       
   260         size_t trailing = len - ret_len;
       
   261 
       
   262         // ensure it's not waaaay too long
       
   263         if (trailing > lp->buf_len)
       
   264             return -ERR_LINE_TOO_LONG;
       
   265 
       
   266         // copy remaining portion to buffer
       
   267         memcpy(lp->out_buf, line + ret_len, trailing);
       
   268 
       
   269         // update offset
       
   270         lp->out_offset = trailing;
       
   271         
       
   272         // buffered... transport should invoke on_write itself
       
   273         return 1;
       
   274 
       
   275     } else {
       
   276         // ok, no buffering needed
       
   277         return SUCCESS;
       
   278 
       
   279     }
       
   280 }
       
   281 
       
   282 int line_proto_flush (struct line_proto *lp)
       
   283 {
       
   284     int ret;
       
   285     size_t ret_len;
       
   286 
       
   287     assert(lp->out_offset);
       
   288 
       
   289     // try and write the line
       
   290     if ((ret = transport_write(lp->transport, lp->out_buf, lp->out_offset, &lp->err)) < 0)
       
   291         return -ERROR_CODE(&lp->err);
       
   292 
       
   293     ret_len = ret;
       
   294 
       
   295     // empty now?
       
   296     if (ret_len == lp->out_offset) {
       
   297         lp->out_offset = 0;
       
   298 
       
   299         return SUCCESS;
       
   300     }
       
   301 
       
   302     // partial?
       
   303     if (ret_len > 0) {
       
   304         size_t remaining = lp->out_offset - ret_len;
       
   305 
       
   306         // move the rest up
       
   307         memmove(lp->out_buf, lp->out_buf + ret_len, remaining);
       
   308 
       
   309         // update offset
       
   310         lp->out_offset = remaining;
       
   311     }
       
   312 
       
   313     // ok
       
   314     return 1;
       
   315 }
       
   316 
       
   317 const struct error_info* line_proto_error (struct line_proto *lp)
       
   318 {
       
   319     // return pointer
       
   320     return &lp->err;
       
   321 }
       
   322 
       
   323 void line_proto_destroy (struct line_proto *lp)
       
   324 {
       
   325     // free buffers
       
   326     free(lp->in_buf);
       
   327     free(lp->out_buf);
       
   328 
       
   329     // socket?
       
   330     if (lp->transport)
       
   331         transport_destroy(lp->transport);
       
   332 
       
   333     // free the state itself
       
   334     free(lp);
       
   335 }
       
   336