src/lib/transport_fd.c
branchnew-lib-errors
changeset 219 cefec18b8268
parent 182 471ca1e744da
equal deleted inserted replaced
218:5229a5d098b2 219:cefec18b8268
       
     1 #include "transport_fd.h"
       
     2 
       
     3 #include "log.h"
       
     4 
       
     5 #include <fcntl.h>
       
     6 #include <unistd.h>
       
     7 #include <assert.h>
       
     8 
       
     9 /**
       
    10  * Our libevent callback
       
    11  */
       
    12 static void transport_fd_on_event (evutil_socket_t _fd, short ev_what, void *arg) 
       
    13 {
       
    14     struct transport_fd *fd = arg;
       
    15 
       
    16     (void) _fd;
       
    17 
       
    18     short what = 0;
       
    19 
       
    20     // build flags
       
    21     if (ev_what & EV_READ)
       
    22         what |= TRANSPORT_READ;
       
    23 
       
    24     if (ev_what & EV_WRITE)
       
    25         what |= TRANSPORT_WRITE;
       
    26 
       
    27     // invoke user callback
       
    28     fd->cb_func(fd, what, fd->cb_arg);
       
    29 }
       
    30 
       
    31 /**
       
    32  * Our transport_methods implementations
       
    33  */
       
    34 err_t transport_fd__read (transport_t *transport, void *buf, size_t *len, error_t *err)
       
    35 {
       
    36     struct transport_fd *fd = transport_check(transport, &transport_fd_type);
       
    37     int ret;
       
    38 
       
    39     error_reset(err);
       
    40     
       
    41     // read(), and detect non-EAGAIN or EOF
       
    42     if ((ret = read(fd->fd, buf, *len)) < 0 && errno != EAGAIN)
       
    43         // unexpected error
       
    44         return SET_ERROR_ERRNO(err, &libc_errors, ERR_READ);
       
    45     
       
    46     else if (ret == 0)
       
    47         // EOF
       
    48         return SET_ERROR(err, &transport_errors, ERR_TRANSPORT_EOF);
       
    49 
       
    50 
       
    51     if (ret < 0) {
       
    52         // EAGAIN -> zero bytes
       
    53         *len = 0;
       
    54 
       
    55     } else {
       
    56         // normal -> bytes read
       
    57         *len = ret;
       
    58     }
       
    59 
       
    60     // ok
       
    61     return SUCCESS;
       
    62 }
       
    63 
       
    64 err_t transport_fd__write (transport_t *transport, const void *buf, size_t *len, error_t *err)
       
    65 {
       
    66     struct transport_fd *fd = transport_check(transport, &transport_fd_type);
       
    67     int ret;
       
    68     
       
    69     error_reset(err);
       
    70     
       
    71     // write(), and detect non-EAGAIN or EOF
       
    72     if ((ret = write(fd->fd, buf, *len)) < 0 && errno != EAGAIN)
       
    73         // unexpected error
       
    74         return SET_ERROR_ERRNO(err, &libc_errors, ERR_WRITE);
       
    75     
       
    76     else if (ret == 0)
       
    77         // EOF
       
    78         return SET_ERROR(err, &libc_errors, ERR_WRITE_EOF);
       
    79 
       
    80 
       
    81     if (ret < 0) {
       
    82         // EAGAIN -> zero bytes
       
    83         *len = 0;
       
    84 
       
    85         if (transport->info.ev_mask & TRANSPORT_WRITE)
       
    86             // enable the write event
       
    87             if ((ERROR_CODE(err) = transport_fd_enable(fd, TRANSPORT_WRITE)))
       
    88                 return ERROR_CODE(err);
       
    89 
       
    90     } else {
       
    91         // normal -> bytes read
       
    92         *len = ret;
       
    93     }
       
    94 
       
    95     return SUCCESS;
       
    96 }
       
    97 
       
    98 err_t transport_fd__events (transport_t *transport, short ev_mask, error_t *err)
       
    99 {
       
   100     struct transport_fd *fd = transport_check(transport, &transport_fd_type);
       
   101     
       
   102     short mask = 0;
       
   103 
       
   104     // enable read as requested
       
   105     if (ev_mask & TRANSPORT_READ)
       
   106         mask |= TRANSPORT_READ;
       
   107     
       
   108     // enable write if requested and it's currently enabled
       
   109     if ((ev_mask & TRANSPORT_WRITE) && event_pending(fd->ev_write, EV_WRITE, NULL))
       
   110         mask |= TRANSPORT_WRITE;
       
   111 
       
   112     // set
       
   113     return (ERROR_CODE(err) = transport_fd_events(fd, mask));
       
   114 }
       
   115 
       
   116 void transport_fd__deinit (transport_t *transport)
       
   117 {
       
   118     struct transport_fd *fd = transport_check(transport, &transport_fd_type);
       
   119 
       
   120     transport_fd_deinit(fd);
       
   121 }
       
   122 
       
   123 const struct transport_type transport_fd_type = {
       
   124     .base_type  = {
       
   125         .parent     = &transport_type_type,
       
   126     },
       
   127     .methods    = {
       
   128         .read       = transport_fd__read,
       
   129         .write      = transport_fd__write,
       
   130         .events     = transport_fd__events,
       
   131         .deinit     = transport_fd__deinit
       
   132     }
       
   133 };
       
   134 
       
   135 /**
       
   136  * Dummy callbacks
       
   137  */
       
   138 void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg)
       
   139 {
       
   140     (void) arg;
       
   141     
       
   142     // proxy
       
   143     transport_invoke(TRANSPORT_FD_BASE(fd), what);
       
   144 }
       
   145 
       
   146 /**
       
   147  * Function implementations
       
   148  */
       
   149 void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd)
       
   150 {
       
   151     // sanity-check
       
   152     assert(!fd->fd);
       
   153     assert(!fd->ev_read && !fd->ev_write);
       
   154     assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
       
   155 
       
   156     // initialize
       
   157     fd->ev_base = ev_base;
       
   158     fd->fd = _fd;
       
   159     fd->cb_func = fd->cb_arg = NULL;
       
   160 }
       
   161 
       
   162 err_t transport_fd_nonblock (struct transport_fd *fd, bool nonblock)
       
   163 {
       
   164     assert(fd->fd != TRANSPORT_FD_INVALID);
       
   165 
       
   166     // XXX: maintain old flags?
       
   167 
       
   168 
       
   169     // set new flags
       
   170     if (fcntl(fd->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0)
       
   171         return ERR_FCNTL;
       
   172 
       
   173     return SUCCESS;
       
   174 }
       
   175 
       
   176 /**
       
   177  * Install our internal event handler.
       
   178  *
       
   179  * The events should not already be set up.
       
   180  *
       
   181  * Cleans up partial events on errors
       
   182  */
       
   183 err_t transport_fd_install (struct transport_fd *fd)
       
   184 {
       
   185     assert(fd->fd != TRANSPORT_FD_INVALID);
       
   186     assert(!fd->ev_read && !fd->ev_write);
       
   187 
       
   188     // create new events
       
   189     if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ | EV_PERSIST, transport_fd_on_event, fd)) == NULL)
       
   190         goto err_event_add;
       
   191 
       
   192     if ((fd->ev_write = event_new(fd->ev_base, fd->fd, EV_WRITE, transport_fd_on_event, fd)) == NULL)
       
   193         goto err_event_add;
       
   194     
       
   195     // ok
       
   196     return SUCCESS;
       
   197 
       
   198 err_event_add:
       
   199     // remove partial events
       
   200     transport_fd_clear(fd);
       
   201 
       
   202     return ERR_EVENT_NEW;
       
   203 }
       
   204 
       
   205 err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg)
       
   206 {
       
   207     // requires a valid fd
       
   208     assert(fd->fd != TRANSPORT_FD_INVALID);
       
   209     
       
   210     // store
       
   211     fd->cb_func = cb_func;
       
   212     fd->cb_arg = cb_arg;
       
   213     
       
   214     // install the event handlers?
       
   215     if (!fd->ev_read || !fd->ev_write)
       
   216         return transport_fd_install(fd);
       
   217     else
       
   218         return SUCCESS;
       
   219 }
       
   220 
       
   221 err_t transport_fd_enable (struct transport_fd *fd, short mask)
       
   222 {
       
   223     // just add the appropriate events
       
   224     if (mask & TRANSPORT_READ && event_add(fd->ev_read, NULL))
       
   225         return ERR_EVENT_ADD;
       
   226  
       
   227     if (mask & TRANSPORT_WRITE && event_add(fd->ev_write, NULL))
       
   228         return ERR_EVENT_ADD;
       
   229     
       
   230 
       
   231     return SUCCESS;
       
   232 }
       
   233 
       
   234 err_t transport_fd_disable (struct transport_fd *fd, short mask)
       
   235 {
       
   236     if (mask & TRANSPORT_READ && event_del(fd->ev_read))
       
   237         return ERR_EVENT_DEL;
       
   238 
       
   239     if (mask & TRANSPORT_WRITE && event_del(fd->ev_write))
       
   240         return ERR_EVENT_DEL;
       
   241 
       
   242 
       
   243     return SUCCESS;
       
   244 }
       
   245 
       
   246 err_t transport_fd_events (struct transport_fd *fd, short mask)
       
   247 {
       
   248     err_t err;
       
   249     
       
   250     // enable/disable read
       
   251     if (mask & TRANSPORT_READ)
       
   252         err = event_add(fd->ev_read, NULL);
       
   253     else
       
   254         err = event_del(fd->ev_read);
       
   255 
       
   256     if (err)
       
   257         return err;
       
   258 
       
   259     // enable/disable write
       
   260     if (mask & TRANSPORT_WRITE)
       
   261         err = event_add(fd->ev_write, NULL);
       
   262     else
       
   263         err = event_del(fd->ev_write);
       
   264 
       
   265     if (err)
       
   266         return err;
       
   267 
       
   268     // ok
       
   269     return SUCCESS;
       
   270 }
       
   271 
       
   272 /**
       
   273  * Remove our current ev_* events, but leave the cb_* intact.
       
   274  */
       
   275 static void transport_fd_remove (struct transport_fd *fd)
       
   276 {
       
   277     if (fd->ev_read)
       
   278         event_free(fd->ev_read);
       
   279 
       
   280     if (fd->ev_write)
       
   281         event_free(fd->ev_write);
       
   282     
       
   283     fd->ev_read = NULL;
       
   284     fd->ev_write = NULL;
       
   285 }
       
   286 
       
   287 void transport_fd_clear (struct transport_fd *fd)
       
   288 {
       
   289     // remove the events
       
   290     transport_fd_remove(fd);
       
   291     
       
   292     // clear the callbacks
       
   293     fd->cb_func = fd->cb_arg = NULL;
       
   294 }
       
   295 
       
   296 err_t transport_fd_defaults (struct transport_fd *fd)
       
   297 {
       
   298     error_t err;
       
   299 
       
   300     // install the transport_invoke callback handler
       
   301     if ((ERROR_CODE(&err) = transport_fd_setup(fd, transport_fd_callback_user, NULL)))
       
   302         goto error;
       
   303 
       
   304     // enable read unless masked out
       
   305     if (TRANSPORT_FD_BASE(fd)->info.ev_mask & TRANSPORT_READ) {
       
   306         if ((ERROR_CODE(&err) = transport_fd_enable(fd, TRANSPORT_READ)))
       
   307             goto error;
       
   308     }
       
   309     
       
   310     // ok
       
   311     return SUCCESS;
       
   312 
       
   313 error:
       
   314     return ERROR_CODE(&err);
       
   315 }
       
   316 
       
   317 err_t transport_fd_set (struct transport_fd *fd, int _fd, error_t *err)
       
   318 {
       
   319     assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0);
       
   320 
       
   321     // close the old stuff
       
   322     if (transport_fd_close(fd, err))
       
   323         log_warn_error(err, "close");
       
   324     
       
   325     // set the new one
       
   326     fd->fd = _fd;
       
   327     
       
   328     // do we have callbacks that we need to setup?
       
   329     if (fd->cb_func)
       
   330         return transport_fd_install(fd);
       
   331 
       
   332     else 
       
   333         return SUCCESS;
       
   334 }
       
   335 
       
   336 void transport_fd_invoke (struct transport_fd *fd, short what)
       
   337 {
       
   338     // invoke
       
   339     transport_invoke(TRANSPORT_FD_BASE(fd), what);
       
   340 }
       
   341 
       
   342 err_t transport_fd_close (struct transport_fd *fd, error_t *err)
       
   343 {
       
   344     int _fd = fd->fd;
       
   345 
       
   346     // remove any installed events
       
   347     transport_fd_remove(fd);
       
   348 
       
   349     // invalidate fd
       
   350     fd->fd = TRANSPORT_FD_INVALID;
       
   351  
       
   352     // close the fd
       
   353     if (_fd != TRANSPORT_FD_INVALID && close(_fd))
       
   354         return SET_ERROR_ERRNO(err, &libc_errors, ERR_CLOSE);
       
   355 
       
   356     return SUCCESS;
       
   357 }
       
   358 
       
   359 void transport_fd_deinit (struct transport_fd *fd)
       
   360 {
       
   361     error_t err;
       
   362 
       
   363     // XXX: this might block
       
   364     if (transport_fd_close(fd, &err))
       
   365         log_warn_error(&err, "close");
       
   366 
       
   367 }
       
   368