# HG changeset patch # User Tero Marttila # Date 1219882447 -10800 # Node ID a5c09677ca6fcde36019c73f17dffcbc74cc657e # Parent 8a832c0e01eef67f0e9ed196235a7dd2d3cc665b add the coro_test.c file, and update .hgignore a bit diff -r 8a832c0e01ee -r a5c09677ca6f .hgignore --- a/.hgignore Thu Aug 28 03:12:11 2008 +0300 +++ b/.hgignore Thu Aug 28 03:14:07 2008 +0300 @@ -5,5 +5,8 @@ lib/ data/* *_main +*_test Makefile.* +cache/proto1/ +cscope.out diff -r 8a832c0e01ee -r a5c09677ca6f coro_test.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/coro_test.c Thu Aug 28 03:14:07 2008 +0300 @@ -0,0 +1,477 @@ + +// splice +#define _GNU_SOURCE +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "common.h" +#include "config.h" +#include "socket.h" + +// what port to listen on +#define LISTEN_PORT 10559 + +// stack size to use for coroutines +#define CLIENT_STACK_SIZE 128 * 1024 +#define PIPELINE_STACK_SIZE 128 * 1024 + +// this should be equal to the system page size +#define PIPELINE_CHUNKSIZE 4096 + +struct co_connect_info { + // what co this is running as + coroutine_t co; + + // set to the value of errno in case of an error, zero otherwise + int error; +}; + +static void _co_connect_write (evutil_socket_t fd, short what, void *arg) { + struct co_connect_info *ctx = arg; + + // retreive the error code + // XXX: this function was lost in a hg-fuckup + if (socket_connect_error(fd, &ctx->error)) + ctx->error = -1; // mark it as failed + + // resume the co + co_call(ctx->co); +} + +// coroutine-connect +int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) { + // our state + struct co_connect_info ctx; + + // the event for the connect + struct event ev; + + // clean it + memset(&ctx, 0, sizeof(ctx)); + + // initiate the connect() + if ((*sock = socket_connect_async(endpoint, sock_type)) == -1) + goto error; + + // set up the event + event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx); + + if (event_add(&ev, NULL)) + goto error; + + // store our co handle in there + ctx.co = co_current(); + + // wait for the result + co_resume(); + + // return the error code + return ctx.error; + +error: + return -1; +} + +struct co_splice_info { + int fd_in, fd_out; + size_t len; + + short wait; + + struct event ev_read, ev_write; + + int error; + + coroutine_t co; +}; + +void _co_splice_ev (int fd, short what, void *arg) { + struct co_splice_info *ctx = arg; + + ctx->wait &= ~what; + + if (!ctx->wait) { + // both ends are readable/writeable! + + long ret; + + if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) { + if (errno == EAGAIN) { + ctx->error = 0; + ctx->len = 0; + + } else { + PERROR("splice"); + } + } else { + ctx->error = 0; + ctx->len = ret; + } + + // let the co do it's thing + co_call(ctx->co); + + } + + + // ok, done succesfully + return; + +error: + // let the co handle this error + ctx->error = errno; + co_call(ctx->co); +} + +// coroutine-splice +int co_splice (int fd_in, int fd_out, size_t *len, int *error) { + // first, try and splice directly + // XXX: ensure all sockets are O_NONBLOCK! + long ret; + + if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) { + if (errno == EAGAIN) { + // wait for in/out to become readable/writeable + + // our state + ZINIT(struct co_splice_info, ctx); + + ctx.fd_in = fd_in; + ctx.fd_out = fd_out; + ctx.len = *len; + ctx.wait = EV_READ | EV_WRITE; + ctx.co = co_current(); + + // two events + event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx); + event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx); + + // add both + if ( + event_add(&ctx.ev_read, NULL) + || event_add(&ctx.ev_write, NULL) + ) + ERROR("event_add"); // XXX: cleanup + + // and wait... + co_resume(); + + // error + if (ctx.error) { + *error = ctx.error; + + return -1; + + } else { + *len = ctx.len; + + return 0; + } + } else + PERROR("splice"); + } else { + // success! + *len = ret; + + return 0; + } + +error: + return -1; +} + +/* +struct co_pipeline_info { + int sock_read, sock_write; + + int pipe_read, pipe_write; + + / * + * Two coroutines: + * * co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will + * attempt to splice data from pipe_read to sock_write. + * * co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write + * that there is data avilable for sending. + * / + coroutine_t co_read, co_write; + + char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE]; +}; + +void _co_pipeline_read (void *arg) { + +} + +void _co_pipeline_write (void *arg) { + +} +*/ + +int co_pipeline (int sock_read, int sock_write) { +// ZINIT(struct co_pipeline_info, ctx); + + // create our pipe + int pipefds[2]; + + if (pipe(pipefds)) + PERROR("pipe"); + + // copy over the fds into ctx + int pipe_read = pipefds[0]; + int pipe_write = pipefds[1]; + + // start shuffling data around + size_t pipe_size = 0, read_len, write_len; + int error = 0; + + while (1) { + read_len = PIPELINE_CHUNKSIZE; + + if (co_splice(sock_read, pipe_write, &read_len, &error)) + PERROR("co_splice(sock_read, pipe_write, %zu)", read_len); + + if (!read_len) { + // EOF + break; + } + + pipe_size += read_len; + + if (!pipe_size) + continue; + + while (pipe_size > 0) { + write_len = pipe_size; + + if (co_splice(pipe_read, sock_write, &write_len, &error)) + PERROR("co_splice(pipe_read, sock_write, %zu", write_len); + + pipe_size -= write_len; + } + } + +/* + // create the two coros + if ( + (ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL + || (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL + ) + ERROR("co_create"); + + // first start off the read one, which then eventually calls the write one + co_call(ctx.co_read); +*/ + + // success + return 0; + +error: +/* + if (ctx.co_read) + co_delete(ctx.co_read); + + if (ctx.co_write) + co_delete(ctx.co_write); +*/ + return -1; +} + +/* + * State needed to handle a client + */ +struct client_info { + // the client socket + evutil_socket_t socket; + + // the connect target endpoint + struct config_endpoint *connect_target; + + // the coroutine that handles it + coroutine_t co; +}; + +void client_co (void *arg) { + struct client_info *ctx = arg; + + // the outbound socket + int conn_sock = -1; + + // connect + if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock)) + goto error; + + // pipe data around \o/ + if (co_pipeline(ctx->socket, conn_sock)) + goto error; + +error: + // close the sockets + if (conn_sock != -1) + if (close(conn_sock)) + PWARNING("close connect_socket"); + + if (close(ctx->socket)) + PWARNING("close listen_socket"); + + // free the ctx + free(ctx); + + // exit + co_exit(); +} + +static void handle_accept (evutil_socket_t fd, short event, void *arg) { + // the new client's client_info + struct client_info *ctx = NULL; + + // the socket from accept() + evutil_socket_t socket = -1; + + // not used + struct sockaddr_storage addr; + socklen_t addr_len; + + // arg is NULL and unused + (void) arg; + + // accept the connection + addr_len = sizeof(struct sockaddr_storage); + + if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1) + PERROR("accept"); + + // alloc a new client_info + if (!(ctx = calloc(1, sizeof(*ctx)))) + ERROR("calloc"); + + // store the socket + ctx->socket = socket; + + // and the endpoint + ctx->connect_target = arg; + + // create the coroutine + if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL) + ERROR("co_create"); + + // we can start up the coroutine right away + co_call(ctx->co); + + // done handling this accept + return; + +error: + if (ctx) { + if (ctx->co) + co_delete(ctx->co); + + free(ctx); + } + + else if (socket != -1) + close(socket); +} + + +int main (int argc, char **argv) { + // libevent init + struct event_base *ev_base = event_init(); + + if (!ev_base) + FATAL("event_init"); + + // process command-line arguments + int opt; + const char *listen_spec = NULL, *connect_spec = NULL; + + while ((opt = getopt(argc, argv, "hl:c:")) != -1) { + switch (opt) { + case 'l': + if (listen_spec) + FATAL("only specify -l once"); + + listen_spec = optarg; + + break; + + case 'c': + if (connect_spec) + FATAL("only specify -c once"); + + connect_spec = optarg; + + break; + + case 'h': + default: + err_exit("Usage: %s [-h] [-l listen] -c ", argv[0]); + + } + } + + if (!connect_spec) + err_exit("Must specify -c; see `%s -h`", argv[0]); + + // ignore SIGPIPE + struct sigaction sigpipe; + memset(&sigpipe, 0, sizeof(sigpipe)); + + sigpipe.sa_handler = SIG_IGN; + + sigaction(SIGPIPE, &sigpipe, NULL); + + // endpoints + struct config_endpoint listen_endpoint, connect_endpoint; + + endpoint_init(&listen_endpoint, LISTEN_PORT); + endpoint_init(&connect_endpoint, 0); + + if ( + endpoint_parse(&listen_endpoint, listen_spec) + || endpoint_parse(&connect_endpoint, connect_spec) + ) + goto error; + + // the listen socket + int listen_sock; + + if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1) + goto error; + + // create the listen event + struct event listen_ev; + + event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint); + + if (event_add(&listen_ev, NULL)) + PERROR("event_add"); + + // we shall now run + INFO("run"); + + // run the libevent mainloop + if (event_base_dispatch(ev_base)) + WARNING("event_dispatch"); + + INFO("shutdown"); + + // cleanup + event_base_free(ev_base); + close(listen_sock); + + return EXIT_SUCCESS; + +error: + return EXIT_FAILURE; +} +