terom@47: terom@47: // splice terom@47: #define _GNU_SOURCE terom@47: #include terom@47: terom@47: #include terom@47: #include terom@47: #include terom@47: #include terom@47: terom@47: #include terom@47: #include terom@47: #include terom@47: #include terom@47: #include terom@47: terom@47: #include "common.h" terom@47: #include "config.h" terom@47: #include "socket.h" terom@47: terom@47: // what port to listen on terom@47: #define LISTEN_PORT 10559 terom@47: terom@47: // stack size to use for coroutines terom@47: #define CLIENT_STACK_SIZE 128 * 1024 terom@47: #define PIPELINE_STACK_SIZE 128 * 1024 terom@47: terom@47: // this should be equal to the system page size terom@47: #define PIPELINE_CHUNKSIZE 4096 terom@47: terom@47: struct co_connect_info { terom@47: // what co this is running as terom@47: coroutine_t co; terom@47: terom@47: // set to the value of errno in case of an error, zero otherwise terom@47: int error; terom@47: }; terom@47: terom@47: static void _co_connect_write (evutil_socket_t fd, short what, void *arg) { terom@47: struct co_connect_info *ctx = arg; terom@47: terom@47: // retreive the error code terom@47: // XXX: this function was lost in a hg-fuckup terom@47: if (socket_connect_error(fd, &ctx->error)) terom@47: ctx->error = -1; // mark it as failed terom@47: terom@47: // resume the co terom@47: co_call(ctx->co); terom@47: } terom@47: terom@47: // coroutine-connect terom@47: int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) { terom@47: // our state terom@47: struct co_connect_info ctx; terom@47: terom@47: // the event for the connect terom@47: struct event ev; terom@47: terom@47: // clean it terom@47: memset(&ctx, 0, sizeof(ctx)); terom@47: terom@47: // initiate the connect() terom@47: if ((*sock = socket_connect_async(endpoint, sock_type)) == -1) terom@47: goto error; terom@47: terom@47: // set up the event terom@47: event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx); terom@47: terom@47: if (event_add(&ev, NULL)) terom@47: goto error; terom@47: terom@47: // store our co handle in there terom@47: ctx.co = co_current(); terom@47: terom@47: // wait for the result terom@47: co_resume(); terom@47: terom@47: // return the error code terom@47: return ctx.error; terom@47: terom@47: error: terom@47: return -1; terom@47: } terom@47: terom@47: struct co_splice_info { terom@47: int fd_in, fd_out; terom@47: size_t len; terom@47: terom@47: short wait; terom@47: terom@47: struct event ev_read, ev_write; terom@47: terom@47: int error; terom@47: terom@47: coroutine_t co; terom@47: }; terom@47: terom@47: void _co_splice_ev (int fd, short what, void *arg) { terom@47: struct co_splice_info *ctx = arg; terom@47: terom@47: ctx->wait &= ~what; terom@47: terom@47: if (!ctx->wait) { terom@47: // both ends are readable/writeable! terom@47: terom@47: long ret; terom@47: terom@47: if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) { terom@47: if (errno == EAGAIN) { terom@47: ctx->error = 0; terom@47: ctx->len = 0; terom@47: terom@47: } else { terom@47: PERROR("splice"); terom@47: } terom@47: } else { terom@47: ctx->error = 0; terom@47: ctx->len = ret; terom@47: } terom@47: terom@47: // let the co do it's thing terom@47: co_call(ctx->co); terom@47: terom@47: } terom@47: terom@47: terom@47: // ok, done succesfully terom@47: return; terom@47: terom@47: error: terom@47: // let the co handle this error terom@47: ctx->error = errno; terom@47: co_call(ctx->co); terom@47: } terom@47: terom@47: // coroutine-splice terom@47: int co_splice (int fd_in, int fd_out, size_t *len, int *error) { terom@47: // first, try and splice directly terom@47: // XXX: ensure all sockets are O_NONBLOCK! terom@47: long ret; terom@47: terom@47: if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) { terom@47: if (errno == EAGAIN) { terom@47: // wait for in/out to become readable/writeable terom@47: terom@47: // our state terom@47: ZINIT(struct co_splice_info, ctx); terom@47: terom@47: ctx.fd_in = fd_in; terom@47: ctx.fd_out = fd_out; terom@47: ctx.len = *len; terom@47: ctx.wait = EV_READ | EV_WRITE; terom@47: ctx.co = co_current(); terom@47: terom@47: // two events terom@47: event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx); terom@47: event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx); terom@47: terom@47: // add both terom@47: if ( terom@47: event_add(&ctx.ev_read, NULL) terom@47: || event_add(&ctx.ev_write, NULL) terom@47: ) terom@47: ERROR("event_add"); // XXX: cleanup terom@47: terom@47: // and wait... terom@47: co_resume(); terom@47: terom@47: // error terom@47: if (ctx.error) { terom@47: *error = ctx.error; terom@47: terom@47: return -1; terom@47: terom@47: } else { terom@47: *len = ctx.len; terom@47: terom@47: return 0; terom@47: } terom@47: } else terom@47: PERROR("splice"); terom@47: } else { terom@47: // success! terom@47: *len = ret; terom@47: terom@47: return 0; terom@47: } terom@47: terom@47: error: terom@47: return -1; terom@47: } terom@47: terom@47: /* terom@47: struct co_pipeline_info { terom@47: int sock_read, sock_write; terom@47: terom@47: int pipe_read, pipe_write; terom@47: terom@47: / * terom@47: * Two coroutines: terom@47: * * co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will terom@47: * attempt to splice data from pipe_read to sock_write. terom@47: * * co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write terom@47: * that there is data avilable for sending. terom@47: * / terom@47: coroutine_t co_read, co_write; terom@47: terom@47: char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE]; terom@47: }; terom@47: terom@47: void _co_pipeline_read (void *arg) { terom@47: terom@47: } terom@47: terom@47: void _co_pipeline_write (void *arg) { terom@47: terom@47: } terom@47: */ terom@47: terom@47: int co_pipeline (int sock_read, int sock_write) { terom@47: // ZINIT(struct co_pipeline_info, ctx); terom@47: terom@47: // create our pipe terom@47: int pipefds[2]; terom@47: terom@47: if (pipe(pipefds)) terom@47: PERROR("pipe"); terom@47: terom@47: // copy over the fds into ctx terom@47: int pipe_read = pipefds[0]; terom@47: int pipe_write = pipefds[1]; terom@47: terom@47: // start shuffling data around terom@47: size_t pipe_size = 0, read_len, write_len; terom@47: int error = 0; terom@47: terom@47: while (1) { terom@47: read_len = PIPELINE_CHUNKSIZE; terom@47: terom@47: if (co_splice(sock_read, pipe_write, &read_len, &error)) terom@47: PERROR("co_splice(sock_read, pipe_write, %zu)", read_len); terom@47: terom@47: if (!read_len) { terom@47: // EOF terom@47: break; terom@47: } terom@47: terom@47: pipe_size += read_len; terom@47: terom@47: if (!pipe_size) terom@47: continue; terom@47: terom@47: while (pipe_size > 0) { terom@47: write_len = pipe_size; terom@47: terom@47: if (co_splice(pipe_read, sock_write, &write_len, &error)) terom@47: PERROR("co_splice(pipe_read, sock_write, %zu", write_len); terom@47: terom@47: pipe_size -= write_len; terom@47: } terom@47: } terom@47: terom@47: /* terom@47: // create the two coros terom@47: if ( terom@47: (ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL terom@47: || (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL terom@47: ) terom@47: ERROR("co_create"); terom@47: terom@47: // first start off the read one, which then eventually calls the write one terom@47: co_call(ctx.co_read); terom@47: */ terom@47: terom@47: // success terom@47: return 0; terom@47: terom@47: error: terom@47: /* terom@47: if (ctx.co_read) terom@47: co_delete(ctx.co_read); terom@47: terom@47: if (ctx.co_write) terom@47: co_delete(ctx.co_write); terom@47: */ terom@47: return -1; terom@47: } terom@47: terom@47: /* terom@47: * State needed to handle a client terom@47: */ terom@47: struct client_info { terom@47: // the client socket terom@47: evutil_socket_t socket; terom@47: terom@47: // the connect target endpoint terom@47: struct config_endpoint *connect_target; terom@47: terom@47: // the coroutine that handles it terom@47: coroutine_t co; terom@47: }; terom@47: terom@47: void client_co (void *arg) { terom@47: struct client_info *ctx = arg; terom@47: terom@47: // the outbound socket terom@47: int conn_sock = -1; terom@47: terom@47: // connect terom@47: if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock)) terom@47: goto error; terom@47: terom@47: // pipe data around \o/ terom@47: if (co_pipeline(ctx->socket, conn_sock)) terom@47: goto error; terom@47: terom@47: error: terom@47: // close the sockets terom@47: if (conn_sock != -1) terom@47: if (close(conn_sock)) terom@47: PWARNING("close connect_socket"); terom@47: terom@47: if (close(ctx->socket)) terom@47: PWARNING("close listen_socket"); terom@47: terom@47: // free the ctx terom@47: free(ctx); terom@47: terom@47: // exit terom@47: co_exit(); terom@47: } terom@47: terom@47: static void handle_accept (evutil_socket_t fd, short event, void *arg) { terom@47: // the new client's client_info terom@47: struct client_info *ctx = NULL; terom@47: terom@47: // the socket from accept() terom@47: evutil_socket_t socket = -1; terom@47: terom@47: // not used terom@47: struct sockaddr_storage addr; terom@47: socklen_t addr_len; terom@47: terom@47: // arg is NULL and unused terom@47: (void) arg; terom@47: terom@47: // accept the connection terom@47: addr_len = sizeof(struct sockaddr_storage); terom@47: terom@47: if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1) terom@47: PERROR("accept"); terom@47: terom@47: // alloc a new client_info terom@47: if (!(ctx = calloc(1, sizeof(*ctx)))) terom@47: ERROR("calloc"); terom@47: terom@47: // store the socket terom@47: ctx->socket = socket; terom@47: terom@47: // and the endpoint terom@47: ctx->connect_target = arg; terom@47: terom@47: // create the coroutine terom@47: if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL) terom@47: ERROR("co_create"); terom@47: terom@47: // we can start up the coroutine right away terom@47: co_call(ctx->co); terom@47: terom@47: // done handling this accept terom@47: return; terom@47: terom@47: error: terom@47: if (ctx) { terom@47: if (ctx->co) terom@47: co_delete(ctx->co); terom@47: terom@47: free(ctx); terom@47: } terom@47: terom@47: else if (socket != -1) terom@47: close(socket); terom@47: } terom@47: terom@47: terom@47: int main (int argc, char **argv) { terom@47: // libevent init terom@47: struct event_base *ev_base = event_init(); terom@47: terom@47: if (!ev_base) terom@47: FATAL("event_init"); terom@47: terom@47: // process command-line arguments terom@47: int opt; terom@47: const char *listen_spec = NULL, *connect_spec = NULL; terom@47: terom@47: while ((opt = getopt(argc, argv, "hl:c:")) != -1) { terom@47: switch (opt) { terom@47: case 'l': terom@47: if (listen_spec) terom@47: FATAL("only specify -l once"); terom@47: terom@47: listen_spec = optarg; terom@47: terom@47: break; terom@47: terom@47: case 'c': terom@47: if (connect_spec) terom@47: FATAL("only specify -c once"); terom@47: terom@47: connect_spec = optarg; terom@47: terom@47: break; terom@47: terom@47: case 'h': terom@47: default: terom@47: err_exit("Usage: %s [-h] [-l listen] -c ", argv[0]); terom@47: terom@47: } terom@47: } terom@47: terom@47: if (!connect_spec) terom@47: err_exit("Must specify -c; see `%s -h`", argv[0]); terom@47: terom@47: // ignore SIGPIPE terom@47: struct sigaction sigpipe; terom@47: memset(&sigpipe, 0, sizeof(sigpipe)); terom@47: terom@47: sigpipe.sa_handler = SIG_IGN; terom@47: terom@47: sigaction(SIGPIPE, &sigpipe, NULL); terom@47: terom@47: // endpoints terom@47: struct config_endpoint listen_endpoint, connect_endpoint; terom@47: terom@47: endpoint_init(&listen_endpoint, LISTEN_PORT); terom@47: endpoint_init(&connect_endpoint, 0); terom@47: terom@47: if ( terom@47: endpoint_parse(&listen_endpoint, listen_spec) terom@47: || endpoint_parse(&connect_endpoint, connect_spec) terom@47: ) terom@47: goto error; terom@47: terom@47: // the listen socket terom@47: int listen_sock; terom@47: terom@47: if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1) terom@47: goto error; terom@47: terom@47: // create the listen event terom@47: struct event listen_ev; terom@47: terom@47: event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint); terom@47: terom@47: if (event_add(&listen_ev, NULL)) terom@47: PERROR("event_add"); terom@47: terom@47: // we shall now run terom@47: INFO("run"); terom@47: terom@47: // run the libevent mainloop terom@47: if (event_base_dispatch(ev_base)) terom@47: WARNING("event_dispatch"); terom@47: terom@47: INFO("shutdown"); terom@47: terom@47: // cleanup terom@47: event_base_free(ev_base); terom@47: close(listen_sock); terom@47: terom@47: return EXIT_SUCCESS; terom@47: terom@47: error: terom@47: return EXIT_FAILURE; terom@47: } terom@47: