coro_test.c
changeset 47 a5c09677ca6f
equal deleted inserted replaced
46:8a832c0e01ee 47:a5c09677ca6f
       
     1 
       
     2 // splice
       
     3 #define _GNU_SOURCE
       
     4 #include <fcntl.h>
       
     5 
       
     6 #include <unistd.h>
       
     7 #include <stdlib.h>
       
     8 #include <signal.h>
       
     9 #include <errno.h>
       
    10 
       
    11 #include <event2/util.h>
       
    12 #include <event2/event.h>
       
    13 #include <event2/event_compat.h>
       
    14 #include <event2/event_struct.h>
       
    15 #include <pcl.h>
       
    16 
       
    17 #include "common.h"
       
    18 #include "config.h"
       
    19 #include "socket.h"
       
    20 
       
    21 // what port to listen on
       
    22 #define LISTEN_PORT 10559
       
    23 
       
    24 // stack size to use for coroutines
       
    25 #define CLIENT_STACK_SIZE 128 * 1024
       
    26 #define PIPELINE_STACK_SIZE 128 * 1024
       
    27 
       
    28 // this should be equal to the system page size
       
    29 #define PIPELINE_CHUNKSIZE 4096
       
    30 
       
    31 struct co_connect_info {
       
    32     // what co this is running as
       
    33     coroutine_t co;
       
    34 
       
    35     // set to the value of errno in case of an error, zero otherwise
       
    36     int error;
       
    37 };
       
    38 
       
    39 static void _co_connect_write (evutil_socket_t fd, short what, void *arg) {
       
    40     struct co_connect_info *ctx = arg;
       
    41     
       
    42     // retreive the error code
       
    43     // XXX: this function was lost in a hg-fuckup
       
    44     if (socket_connect_error(fd, &ctx->error))
       
    45         ctx->error = -1;   // mark it as failed
       
    46     
       
    47     // resume the co
       
    48     co_call(ctx->co);
       
    49 }
       
    50 
       
    51 // coroutine-connect
       
    52 int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) {
       
    53     // our state
       
    54     struct co_connect_info ctx;
       
    55 
       
    56     // the event for the connect
       
    57     struct event ev;
       
    58 
       
    59     // clean it
       
    60     memset(&ctx, 0, sizeof(ctx));
       
    61     
       
    62     // initiate the connect()
       
    63     if ((*sock = socket_connect_async(endpoint, sock_type)) == -1)
       
    64         goto error;
       
    65     
       
    66     // set up the event
       
    67     event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx);
       
    68 
       
    69     if (event_add(&ev, NULL))
       
    70         goto error;
       
    71     
       
    72     // store our co handle in there
       
    73     ctx.co = co_current();
       
    74 
       
    75     // wait for the result
       
    76     co_resume();
       
    77     
       
    78     // return the error code
       
    79     return ctx.error;
       
    80 
       
    81 error:
       
    82     return -1;
       
    83 }
       
    84 
       
    85 struct co_splice_info {
       
    86     int fd_in, fd_out;
       
    87     size_t len;
       
    88 
       
    89     short wait;
       
    90 
       
    91     struct event ev_read, ev_write;
       
    92 
       
    93     int error;
       
    94 
       
    95     coroutine_t co;
       
    96 };
       
    97 
       
    98 void _co_splice_ev (int fd, short what, void *arg) {
       
    99     struct co_splice_info *ctx = arg;
       
   100     
       
   101     ctx->wait &= ~what;
       
   102 
       
   103     if (!ctx->wait) {
       
   104         // both ends are readable/writeable!
       
   105 
       
   106         long ret;
       
   107 
       
   108         if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
       
   109             if (errno == EAGAIN) {
       
   110                 ctx->error = 0;
       
   111                 ctx->len = 0;
       
   112 
       
   113             } else {
       
   114                 PERROR("splice");
       
   115             }
       
   116         } else {
       
   117             ctx->error = 0;
       
   118             ctx->len = ret;
       
   119         }
       
   120 
       
   121         // let the co do it's thing
       
   122         co_call(ctx->co);
       
   123 
       
   124     }
       
   125     
       
   126 
       
   127     // ok, done succesfully
       
   128     return;
       
   129 
       
   130 error:
       
   131     // let the co handle this error
       
   132     ctx->error = errno;
       
   133     co_call(ctx->co);
       
   134 }
       
   135 
       
   136 // coroutine-splice
       
   137 int co_splice (int fd_in, int fd_out, size_t *len, int *error) {
       
   138     // first, try and splice directly
       
   139     // XXX: ensure all sockets are O_NONBLOCK!
       
   140     long ret;
       
   141 
       
   142     if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
       
   143         if (errno == EAGAIN) {
       
   144             // wait for in/out to become readable/writeable
       
   145             
       
   146             // our state
       
   147             ZINIT(struct co_splice_info, ctx);
       
   148 
       
   149             ctx.fd_in = fd_in;
       
   150             ctx.fd_out = fd_out;
       
   151             ctx.len = *len;
       
   152             ctx.wait = EV_READ | EV_WRITE;
       
   153             ctx.co = co_current();
       
   154             
       
   155             // two events
       
   156             event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx);
       
   157             event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx);
       
   158 
       
   159             // add both
       
   160             if (
       
   161                     event_add(&ctx.ev_read, NULL)
       
   162                 ||  event_add(&ctx.ev_write, NULL)
       
   163             )
       
   164                 ERROR("event_add"); // XXX: cleanup
       
   165             
       
   166             // and wait...
       
   167             co_resume();
       
   168             
       
   169             // error
       
   170             if (ctx.error) {
       
   171                 *error = ctx.error;
       
   172 
       
   173                 return -1;
       
   174 
       
   175             } else {
       
   176                 *len = ctx.len;
       
   177 
       
   178                 return 0;
       
   179             }
       
   180         } else
       
   181             PERROR("splice");
       
   182     } else {
       
   183         // success!
       
   184         *len = ret;
       
   185         
       
   186         return 0;
       
   187     }
       
   188 
       
   189 error:
       
   190     return -1;
       
   191 }
       
   192 
       
   193 /*
       
   194 struct co_pipeline_info {
       
   195     int sock_read, sock_write;
       
   196 
       
   197     int pipe_read, pipe_write;
       
   198     
       
   199     / *
       
   200      * Two coroutines:
       
   201      *  *   co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will
       
   202      *      attempt to splice data from pipe_read to sock_write.
       
   203      *  *   co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write
       
   204      *      that there is data avilable for sending.
       
   205      * /
       
   206     coroutine_t co_read, co_write;
       
   207 
       
   208     char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE];
       
   209 };
       
   210 
       
   211 void _co_pipeline_read (void *arg) {
       
   212     
       
   213 }
       
   214 
       
   215 void _co_pipeline_write (void *arg) {
       
   216 
       
   217 }
       
   218 */
       
   219 
       
   220 int co_pipeline (int sock_read, int sock_write) {
       
   221 //    ZINIT(struct co_pipeline_info, ctx);
       
   222     
       
   223     // create our pipe
       
   224     int pipefds[2];
       
   225 
       
   226     if (pipe(pipefds))
       
   227         PERROR("pipe");
       
   228     
       
   229     // copy over the fds into ctx
       
   230     int pipe_read = pipefds[0];
       
   231     int pipe_write = pipefds[1];
       
   232 
       
   233     // start shuffling data around
       
   234     size_t pipe_size = 0, read_len, write_len;
       
   235     int error = 0;
       
   236 
       
   237     while (1) {
       
   238         read_len = PIPELINE_CHUNKSIZE;
       
   239 
       
   240         if (co_splice(sock_read, pipe_write, &read_len, &error))
       
   241             PERROR("co_splice(sock_read, pipe_write, %zu)", read_len);
       
   242         
       
   243         if (!read_len) {
       
   244             // EOF
       
   245             break;
       
   246         }
       
   247 
       
   248         pipe_size += read_len;
       
   249 
       
   250         if (!pipe_size)
       
   251             continue;
       
   252 
       
   253         while (pipe_size > 0) {
       
   254             write_len = pipe_size;
       
   255 
       
   256             if (co_splice(pipe_read, sock_write, &write_len, &error))
       
   257                 PERROR("co_splice(pipe_read, sock_write, %zu", write_len);
       
   258 
       
   259             pipe_size -= write_len;
       
   260         }
       
   261     }
       
   262 
       
   263 /*
       
   264     // create the two coros
       
   265     if (
       
   266             (ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL
       
   267         ||  (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL
       
   268     ) 
       
   269         ERROR("co_create");
       
   270     
       
   271     // first start off the read one, which then eventually calls the write one
       
   272     co_call(ctx.co_read);
       
   273 */
       
   274 
       
   275     // success
       
   276     return 0;
       
   277 
       
   278 error:
       
   279 /*
       
   280     if (ctx.co_read)
       
   281         co_delete(ctx.co_read);
       
   282 
       
   283     if (ctx.co_write)
       
   284         co_delete(ctx.co_write);
       
   285 */
       
   286     return -1;
       
   287 }
       
   288 
       
   289 /*
       
   290  * State needed to handle a client
       
   291  */
       
   292 struct client_info {
       
   293     // the client socket
       
   294     evutil_socket_t socket;
       
   295 
       
   296     // the connect target endpoint
       
   297     struct config_endpoint *connect_target;
       
   298 
       
   299     // the coroutine that handles it
       
   300     coroutine_t co;
       
   301 };
       
   302 
       
   303 void client_co (void *arg) {
       
   304     struct client_info *ctx = arg;
       
   305     
       
   306     // the outbound socket
       
   307     int conn_sock = -1;
       
   308 
       
   309     // connect
       
   310     if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock))
       
   311         goto error;
       
   312     
       
   313     // pipe data around \o/
       
   314     if (co_pipeline(ctx->socket, conn_sock))
       
   315         goto error;
       
   316 
       
   317 error:
       
   318     // close the sockets
       
   319     if (conn_sock != -1)
       
   320         if (close(conn_sock))
       
   321             PWARNING("close connect_socket");
       
   322     
       
   323     if (close(ctx->socket))
       
   324         PWARNING("close listen_socket");
       
   325     
       
   326     // free the ctx
       
   327     free(ctx);
       
   328 
       
   329     // exit
       
   330     co_exit();
       
   331 }
       
   332 
       
   333 static void handle_accept (evutil_socket_t fd, short event, void *arg) {
       
   334     // the new client's client_info
       
   335     struct client_info *ctx = NULL;
       
   336     
       
   337     // the socket from accept()
       
   338     evutil_socket_t socket = -1;
       
   339 
       
   340     // not used
       
   341     struct sockaddr_storage addr;
       
   342     socklen_t addr_len;
       
   343     
       
   344     // arg is NULL and unused
       
   345     (void) arg;
       
   346     
       
   347     // accept the connection
       
   348     addr_len = sizeof(struct sockaddr_storage);
       
   349 
       
   350     if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
       
   351         PERROR("accept");
       
   352     
       
   353     // alloc a new client_info
       
   354     if (!(ctx = calloc(1, sizeof(*ctx))))
       
   355         ERROR("calloc");
       
   356     
       
   357     // store the socket
       
   358     ctx->socket = socket;
       
   359 
       
   360     // and the endpoint
       
   361     ctx->connect_target = arg;
       
   362 
       
   363     // create the coroutine
       
   364     if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL)
       
   365         ERROR("co_create");
       
   366 
       
   367     // we can start up the coroutine right away
       
   368     co_call(ctx->co);
       
   369     
       
   370     // done handling this accept
       
   371     return;
       
   372 
       
   373 error:
       
   374     if (ctx) {
       
   375         if (ctx->co)
       
   376             co_delete(ctx->co);
       
   377 
       
   378         free(ctx);
       
   379     }
       
   380 
       
   381     else if (socket != -1)
       
   382         close(socket);
       
   383 }
       
   384 
       
   385 
       
   386 int main (int argc, char **argv) {
       
   387      // libevent init
       
   388     struct event_base *ev_base = event_init();
       
   389 
       
   390     if (!ev_base)
       
   391         FATAL("event_init");
       
   392     
       
   393     // process command-line arguments
       
   394     int opt;
       
   395     const char *listen_spec = NULL, *connect_spec = NULL;
       
   396 
       
   397     while ((opt = getopt(argc, argv, "hl:c:")) != -1) {
       
   398         switch (opt) {
       
   399             case 'l':
       
   400                 if (listen_spec)
       
   401                     FATAL("only specify -l once");
       
   402 
       
   403                 listen_spec = optarg;
       
   404 
       
   405                 break;
       
   406 
       
   407             case 'c':
       
   408                 if (connect_spec)
       
   409                     FATAL("only specify -c once");
       
   410                 
       
   411                 connect_spec = optarg;
       
   412 
       
   413                 break;
       
   414 
       
   415             case 'h':
       
   416             default:
       
   417                 err_exit("Usage: %s [-h] [-l listen] -c <connect>", argv[0]);
       
   418         
       
   419         }
       
   420     }
       
   421 
       
   422     if (!connect_spec)
       
   423         err_exit("Must specify -c; see `%s -h`", argv[0]);
       
   424     
       
   425     // ignore SIGPIPE
       
   426     struct sigaction sigpipe;
       
   427     memset(&sigpipe, 0, sizeof(sigpipe));
       
   428 
       
   429     sigpipe.sa_handler = SIG_IGN;
       
   430 
       
   431     sigaction(SIGPIPE, &sigpipe, NULL);
       
   432     
       
   433     // endpoints
       
   434     struct config_endpoint listen_endpoint, connect_endpoint;
       
   435     
       
   436     endpoint_init(&listen_endpoint, LISTEN_PORT);
       
   437     endpoint_init(&connect_endpoint, 0);
       
   438 
       
   439     if (
       
   440             endpoint_parse(&listen_endpoint, listen_spec)
       
   441         ||  endpoint_parse(&connect_endpoint, connect_spec)
       
   442     )
       
   443         goto error;
       
   444     
       
   445     // the listen socket
       
   446     int listen_sock;
       
   447 
       
   448     if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1)
       
   449         goto error;
       
   450 
       
   451     // create the listen event
       
   452     struct event listen_ev;
       
   453 
       
   454     event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint);
       
   455 
       
   456     if (event_add(&listen_ev, NULL))
       
   457         PERROR("event_add");
       
   458 
       
   459     // we shall now run
       
   460     INFO("run");
       
   461     
       
   462     // run the libevent mainloop
       
   463     if (event_base_dispatch(ev_base))
       
   464         WARNING("event_dispatch");
       
   465 
       
   466     INFO("shutdown");
       
   467 
       
   468     // cleanup
       
   469     event_base_free(ev_base);
       
   470     close(listen_sock);
       
   471 
       
   472     return EXIT_SUCCESS;
       
   473 
       
   474 error:
       
   475     return EXIT_FAILURE;
       
   476 }
       
   477