src/line_proto.c
changeset 19 8c80580ccde9
parent 18 dedf137b504f
child 20 d9c4c2980a0d
equal deleted inserted replaced
18:dedf137b504f 19:8c80580ccde9
    12  */
    12  */
    13 struct line_proto {
    13 struct line_proto {
    14     /* The sock_stream we read/write with */
    14     /* The sock_stream we read/write with */
    15     struct sock_stream *sock;
    15     struct sock_stream *sock;
    16 
    16 
    17     /* The incoming line buffer */
    17     /* The incoming/outgoing line buffer */
    18     char *buf;
    18     char *in_buf, *out_buf;
    19 
    19 
    20     /* Incoming buffer size */
    20     /* Buffer size (same for both) */
    21     size_t buf_len;
    21     size_t buf_len;
    22 
    22 
    23     /* Offset of trailing data in buf */
    23     /* Offset of trailing data in buf */
    24     size_t tail_offset;
    24     size_t tail_offset;
    25 
    25 
    26     /* Length of trailing data in buf, if any */
    26     /* Length of trailing data in buf, if any */
    27     size_t tail_len;
    27     size_t tail_len;
       
    28 
       
    29     /* Amount of data in the out buffer */
       
    30     size_t out_offset;
    28 
    31 
    29     /* Last error */
    32     /* Last error */
    30     struct error_info err;
    33     struct error_info err;
    31 
    34 
    32     /* Callback info */
    35     /* Callback info */
    48     // sanity-check
    51     // sanity-check
    49     assert(lp->tail_offset < lp->buf_len);
    52     assert(lp->tail_offset < lp->buf_len);
    50     
    53     
    51     do {
    54     do {
    52         // attempt to read a line
    55         // attempt to read a line
    53         if (line_proto_read(lp, &line))
    56         if (line_proto_recv(lp, &line))
    54             // XXX: fail
    57             // XXX: fail
    55             errx(1, "line_proto_read: %s", error_msg(&lp->err));
    58             errx(1, "line_proto_recv: %s", error_msg(&lp->err));
    56         
    59         
    57         // got a line?
    60         // got a line?
    58         if (line)
    61         if (line)
    59             lp->cb_read(line, lp->cb_arg);
    62             lp->cb_read(line, lp->cb_arg);
    60 
    63 
    64     if (line_proto_schedule_events(lp, EV_READ))
    67     if (line_proto_schedule_events(lp, EV_READ))
    65         errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
    68         errx(1, "line_proto_schedule_events: %s", error_msg(&lp->err));
    66 }
    69 }
    67 
    70 
    68 /*
    71 /*
       
    72  * Signal for write
       
    73  */
       
    74 static void line_proto_on_write (struct sock_stream *sock, void *arg)
       
    75 {
       
    76     struct line_proto *lp = arg;
       
    77     err_t err;
       
    78 
       
    79     // just flush
       
    80     if ((err = line_proto_flush(lp)) < 0)
       
    81         errx(1, "line_proto_flush: %s", error_name(err));
       
    82 }
       
    83 
       
    84 /*
    69  * Schedule our sock_stream callback
    85  * Schedule our sock_stream callback
    70  */
    86  */
    71 static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
    87 static err_t line_proto_schedule_events (struct line_proto *lp, short what) 
    72 {
    88 {
    73     // just use sock_stream's interface
    89     // just use sock_stream's interface
    84 err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
   100 err_t line_proto_create (struct line_proto **lp_ptr, struct sock_stream *sock, size_t buf_size,
    85         line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
   101         line_proto_read_cb cb_func, void *cb_arg, struct error_info *err)
    86 {
   102 {
    87     struct line_proto *lp;
   103     struct line_proto *lp;
    88 
   104 
    89     // allocate
   105     // allocate struct and buffers
    90     if ((lp = calloc(1, sizeof(*lp))) == NULL)
   106     if (
    91         return SET_ERROR(err, ERR_CALLOC);
   107             (lp = calloc(1, sizeof(*lp))) == NULL
    92     
   108         ||  (lp->in_buf = malloc(buf_size)) == NULL
    93     // allocate buffer
   109         ||  (lp->out_buf = malloc(buf_size)) == NULL
    94     if ((lp->buf = malloc(buf_size)) == NULL) {
   110     )
    95         free(lp);
   111         JUMP_SET_ERROR(err, ERR_CALLOC);
    96         return SET_ERROR(err, ERR_CALLOC);
       
    97     }
       
    98 
   112 
    99     // store
   113     // store
   100     lp->sock = sock;
   114     lp->sock = sock;
   101     lp->buf_len = buf_size;
   115     lp->buf_len = buf_size;
   102     lp->cb_read = cb_func;
   116     lp->cb_read = cb_func;
   111 
   125 
   112     // return
   126     // return
   113     *lp_ptr = lp;
   127     *lp_ptr = lp;
   114 
   128 
   115     return SUCCESS;
   129     return SUCCESS;
       
   130 
       
   131 error:
       
   132     if (lp) {
       
   133         free(lp->in_buf);
       
   134         free(lp->out_buf);
       
   135     }
       
   136 
       
   137     free(lp);
   116 }
   138 }
   117 
   139 
   118 /*
   140 /*
   119  * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be
   141  * This looks for a full '\r\n' terminated line at the beginning of the given buffer. If found, the \r\n will be
   120  * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned
   142  * replaced with a '\0', and the offset to the beginning of the next line returned. If not found, zero is returned
   156 
   178 
   157     // return offset to next line, as set in loop based on delim
   179     // return offset to next line, as set in loop based on delim
   158     return next;
   180     return next;
   159 }
   181 }
   160 
   182 
   161 err_t line_proto_read (struct line_proto *lp, char **line_ptr)
   183 err_t line_proto_recv (struct line_proto *lp, char **line_ptr)
   162 {
   184 {
   163     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
   185     // offset to recv() new data into, offset to _parse_line hint, offset to next line from _parse_line
   164     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
   186     size_t recv_offset = 0, peek_offset = 0, next_offset = 0;
   165     int ret;
   187     int ret;
   166 
   188 
   168     recv_offset = lp->tail_len;
   190     recv_offset = lp->tail_len;
   169 
   191 
   170     // move trailing data from previous line to front of buffer
   192     // move trailing data from previous line to front of buffer
   171     if (lp->tail_offset) {
   193     if (lp->tail_offset) {
   172         // move to front
   194         // move to front
   173         memmove(lp->buf, lp->buf + lp->tail_offset, lp->tail_len);
   195         memmove(lp->in_buf, lp->in_buf + lp->tail_offset, lp->tail_len);
   174 
   196 
   175         // reset
   197         // reset
   176         lp->tail_offset = 0;
   198         lp->tail_offset = 0;
   177         lp->tail_len = 0;
   199         lp->tail_len = 0;
   178     }
   200     }
   179 
   201 
   180     // readline loop
   202     // readline loop
   181     do {
   203     do {
   182         // parse any line at the beginning of the buffer
   204         // parse any line at the beginning of the buffer
   183         if ((next_offset = _parse_line(lp->buf, recv_offset, &peek_offset)) > 0) {
   205         if ((next_offset = _parse_line(lp->in_buf, recv_offset, &peek_offset)) > 0) {
   184             // store a valid *line_ptr
   206             // store a valid *line_ptr
   185             *line_ptr = lp->buf;
   207             *line_ptr = lp->in_buf;
   186             
   208             
   187             // exit loop and return
   209             // exit loop and return
   188             break;
   210             break;
   189         }
   211         }
   190 
   212 
   191         // ensure there's enough space for the rest of the line
   213         // ensure there's enough space for the rest of the line
   192         // XXX: this must be an error, not an assert
   214         if (recv_offset >= lp->buf_len)
   193         assert(recv_offset < lp->buf_len);
   215             return ERR_LINE_TOO_LONG;
   194         
   216         
   195         // otherwise, read more data
   217         // otherwise, read more data
   196         if ((ret = sock_stream_read(lp->sock, lp->buf + recv_offset, lp->buf_len - recv_offset)) < 0)
   218         if ((ret = sock_stream_read(lp->sock, lp->in_buf + recv_offset, lp->buf_len - recv_offset)) < 0)
   197             // store and return NULL on errors
   219             // store and return NULL on errors
   198             RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
   220             RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
   199 
   221 
   200         // EAGAIN?
   222         // EAGAIN?
   201         if (ret == 0) {
   223         if (ret == 0) {
   215 
   237 
   216     // ok
   238     // ok
   217     return SUCCESS;
   239     return SUCCESS;
   218 }
   240 }
   219 
   241 
   220 int line_proto_write (struct line_proto *lp, const char *line)
   242 int line_proto_send (struct line_proto *lp, const char *line)
   221 {
   243 {
   222     int ret;
   244     int ret;
   223     size_t len = strlen(line);
   245     size_t len = strlen(line);
   224 
   246 
   225     // XXX: no output buffers for now :)
   247     // drop line if we already have output buffered
   226     if ((ret = sock_stream_write(lp->sock, line, len)) < 0)
   248     if (lp->out_offset)
   227         RETURN_SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
   249         return -ERR_WRITE_EOF;
       
   250     
       
   251     // try and write the line
       
   252     if ((ret = sock_stream_write(lp->sock, line, len)) < 0) {
       
   253         SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
       
   254 
       
   255         return -ERROR_CODE(&lp->err);
       
   256     }
   228 
   257 
   229     // EAGAIN or partial?
   258     // EAGAIN or partial?
   230     if (ret < len) {
   259     if (ret < len) {
   231         // XXX: ugly hack, need partial line buffering
   260         size_t trailing = len - ret;
   232         return -1;
   261 
   233     }
   262         // ensure it's not waaaay too long
       
   263         if (trailing > lp->buf_len)
       
   264             return -ERR_LINE_TOO_LONG;
       
   265 
       
   266         // copy remaining portion to buffer
       
   267         memcpy(lp->out_buf, line + ret, trailing);
       
   268 
       
   269         // update offset
       
   270         lp->out_offset = trailing;
       
   271         
       
   272         // register for EV_WRITE
       
   273         line_proto_schedule_events(lp, EV_READ | EV_WRITE);
       
   274 
       
   275         // buffered...
       
   276         return 1;
       
   277 
       
   278     } else {
       
   279         // ok, no buffering needed
       
   280         return SUCCESS;
       
   281 
       
   282     }
       
   283 }
       
   284 
       
   285 int line_proto_flush (struct line_proto *lp)
       
   286 {
       
   287     int ret;
       
   288 
       
   289     // try and write the line
       
   290     if ((ret = sock_stream_write(lp->sock, lp->out_buf, lp->out_offset)) < 0) {
       
   291         SET_ERROR_INFO(&lp->err, sock_stream_error(lp->sock));
       
   292 
       
   293         return -ERROR_CODE(&lp->err);
       
   294     }
       
   295 
       
   296     // empty now?
       
   297     if (ret == lp->out_offset) {
       
   298         lp->out_offset = 0;
       
   299 
       
   300         return SUCCESS;
       
   301     }
       
   302 
       
   303     // partial?
       
   304     if (ret > 0) {
       
   305         size_t remaining = lp->out_offset - ret;
       
   306 
       
   307         // move the rest up
       
   308         memmove(lp->out_buf, lp->out_buf + ret, remaining);
       
   309 
       
   310         // update offset
       
   311         lp->out_offset = remaining;
       
   312     }
       
   313 
       
   314     // register for EV_WRITE
       
   315     line_proto_schedule_events(lp, EV_READ | EV_WRITE);
   234 
   316 
   235     // ok
   317     // ok
   236     return SUCCESS;
   318     return 1;
   237 }
   319 }
   238 
   320 
   239 const struct error_info* line_proto_error (struct line_proto *lp)
   321 const struct error_info* line_proto_error (struct line_proto *lp)
   240 {
   322 {
   241     // return pointer
   323     // return pointer
   242     return &lp->err;
   324     return &lp->err;
   243 }
   325 }
       
   326