src/line_proto.c
changeset 10 9fe218576d13
parent 8 be88e543c8ff
child 11 14e79683c48c
equal deleted inserted replaced
9:4c4c906cc649 10:9fe218576d13
     3 
     3 
     4 #include <string.h>
     4 #include <string.h>
     5 #include <stdlib.h>
     5 #include <stdlib.h>
     6 #include <assert.h>
     6 #include <assert.h>
     7 
     7 
       
     8 #include <err.h>
       
     9 
     8 /*
    10 /*
     9  * Our state
    11  * Our state
    10  */
    12  */
    11 struct line_proto {
    13 struct line_proto {
    12     /* The sock_stream we read/write with */
    14     /* The sock_stream we read/write with */
    13     struct sock_stream *sock;
    15     struct sock_stream *sock;
    14 
    16 
       
    17     /* The incoming line buffer */
       
    18     char *buf;
       
    19 
       
    20     /* Incoming buffer size */
       
    21     size_t buf_len;
       
    22 
    15     /* Offset of trailing data in buf */
    23     /* Offset of trailing data in buf */
    16     size_t tail_offset;
    24     size_t tail_offset;
    17 
    25 
    18     /* Length of trailing data in buf, if any */
    26     /* Length of trailing data in buf, if any */
    19     size_t tail_len;
    27     size_t tail_len;
    20 
    28 
    21     /* Last error */
    29     /* Last error */
    22     struct error_info err;
    30     struct error_info err;
       
    31 
       
    32     /* Callback info */
       
    33     line_proto_read_cb cb_read;
       
    34     void *cb_arg;
    23 };
    35 };
    24 
    36 
    25 err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, struct error_info *err)
    37 // function prototypes
       
    38 static err_t line_proto_schedule_events (struct line_proto *lp, short what);
       
    39 
       
    40 /*
       
    41  * Our sock_stream on_read handler
       
    42  */
       
    43 static void line_proto_on_read (struct sock_stream *sock, void *arg)
       
    44 {
       
    45     struct line_proto *lp = arg;
       
    46     const char *line;
       
    47 
       
    48     // sanity-check
       
    49     assert(lp->tail_offset < lp->buf_len);
       
    50     
       
    51     do {
       
    52         // attempt to read a line
       
    53         if (line_proto_read(lp, &line))
       
    54             // XXX: fail
       
    55             errx(1, "line_proto_read: %s", error_msg(&lp->err));
       
    56         
       
    57         // got a line?
       
    58         if (line)
       
    59             lp->cb_read(lp, line, lp->cb_arg);
       
    60 
       
    61     } while (line);
       
    62 
       
    63     // reschedule
       
    64     if (line_proto_schedule_events(lp, EV_READ))
       
    65         errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
       
    66 }
       
    67 
       
    68 /*
       
    69  * Schedule our sock_stream callback
       
    70  */
       
    71 static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
       
    72 {
       
    73     // just use sock_stream's interface
       
    74     if (sock_stream_event_enable(lp->sock, what))
       
    75         RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
       
    76 
       
    77     return SUCCESS;
       
    78 }
       
    79 
       
    80 struct sock_stream_callbacks line_proto_sock_stream_callbacks = {
       
    81     .on_read    = &line_proto_on_read,
       
    82 };
       
    83 
       
    84 err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
       
    85         line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
    26 {
    86 {
    27     struct line_proto *lp;
    87     struct line_proto *lp;
    28 
    88 
    29     // allocate
    89     // allocate
    30     if ((lp = calloc(1, sizeof(*lp))) == NULL)
    90     if ((lp = calloc(1, sizeof(*lp))) == NULL)
    31         return SET_ERROR(err, ERR_CALLOC);
    91         return SET_ERROR(err, ERR_CALLOC);
       
    92     
       
    93     // allocate buffer
       
    94     if ((lp->buf = malloc(buf_size)) == NULL) {
       
    95         free(lp);
       
    96         return SET_ERROR(err, ERR_CALLOC);
       
    97     }
    32 
    98 
    33     // store
    99     // store
    34     lp->sock = sock;
   100     lp->sock = sock;
       
   101     lp->buf_len = buf_size;
       
   102     lp->cb_read = cb_func;
       
   103     lp->cb_arg = cb_arg;
       
   104 
       
   105     // initialize event-based stuff
       
   106     sock_stream_event_init(sock, &line_proto_sock_stream_callbacks, lp);
    35 
   107 
    36     // return
   108     // return
    37     *lp_ptr = lp;
   109     *lp_ptr = lp;
    38 
   110 
    39     return SUCCESS;
   111     return SUCCESS;
    73 
   145 
    74     // return offset to next line
   146     // return offset to next line
    75     return i + 2;
   147     return i + 2;
    76 }
   148 }
    77 
   149 
    78 err_t line_proto_read (struct line_proto *lp, char *buf, size_t len)
   150 err_t line_proto_read (struct line_proto *lp, const char **line_ptr)
    79 {
   151 {
    80     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
   152     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
    81     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
   153     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
    82     int ret;
   154     int ret;
    83 
   155 
    85     recv_offset = lp->tail_len;
   157     recv_offset = lp->tail_len;
    86 
   158 
    87     // move trailing data from previous line to front of buffer
   159     // move trailing data from previous line to front of buffer
    88     if (lp->tail_offset) {
   160     if (lp->tail_offset) {
    89         // move to front
   161         // move to front
    90         memmove(buf, buf + lp->tail_offset, lp->tail_len);
   162         memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
    91 
   163 
    92         // reset
   164         // reset
    93         lp->tail_offset = 0;
   165         lp->tail_offset = 0;
    94         lp->tail_len = 0;
   166         lp->tail_len = 0;
    95     }
   167     }
    96 
   168 
    97     // readline loop
   169     // readline loop
    98     do {
   170     do {
    99         // parse any line at the beginning of the buffer
   171         // parse any line at the beginning of the buffer
   100         if ((next_offset = _parse_line(buf, recv_offset, &peek_offset)) > 0)
   172         if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
       
   173             // store a valid *line_ptr
       
   174             *line_ptr = lp->buf;
       
   175             
       
   176             // exit loop and return
   101             break;
   177             break;
   102 
   178         }
   103         // ensure there's enough space for it
   179 
   104         assert(recv_offset < len);
   180         // ensure there's enough space for the rest of the line
       
   181         // XXX: this must be an error, not an assert
       
   182         assert(recv_offset < lp->buf_len);
   105         
   183         
   106         // otherwise, read more data
   184         // otherwise, read more data
   107         if ((ret = sock_stream_read(lp->sock, buf + recv_offset, len - recv_offset)) < 0)
   185         if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0) {
   108             RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
   186             // we can special-case EAGAIN, as it's expected
       
   187             if (MATCH_ERROR(sock_stream_error(lp->sock), ERR_READ, EAGAIN)) {
       
   188                 // return a NULL *line_ptr
       
   189                 *line_ptr = NULL;
       
   190                 break;
       
   191 
       
   192             } else {
       
   193                 // store and return NULL on errors
       
   194                 RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
       
   195 
       
   196             }
       
   197         }
   109 
   198 
   110         // EOF?
   199         // EOF?
   111         if (ret == 0)
   200         if (ret == 0)
   112             return SET_ERROR(&lp->err, ERR_READ_EOF);
   201             return SET_ERROR(&lp->err, ERR_READ_EOF);
   113         
   202